From 3095ea9c3d155bfc89197d2628eb818a55c2755d Mon Sep 17 00:00:00 2001 From: jif Date: Fri, 26 Jun 2026 01:36:44 +0100 Subject: [PATCH] Project selected plugin runtime by environment availability (#30093) ## Why Selected plugin metadata is stable, but MCP processes are live runtime state. They need different lifetimes: - the MCP extension caches manifest, MCP, and connector declarations for each stable selected root; - each model step projects that cached metadata through the roots that resolved as ready for that exact step; - the MCP manager is rebuilt only when that availability projection changes. This matches executor skills: both features consume the same resolved step roots instead of inferring readiness from the turn's selected environments. ## Behavior ```text E1 not ready for this step -> no E1 MCP servers or connectors -> cached plugin metadata stays in ext/mcp E1 becomes ready -> reuse cached metadata -> publish one MCP runtime containing E1 capabilities same ready roots on the next step -> reuse the exact runtime; no rediscovery and no MCP restart resume -> create new extension thread state and a new MCP runtime ``` All model-facing consumers use the same step snapshot: ```text resolved selected roots | v extension MCP/connector projection | v { MCP config, connector snapshot, MCP manager } | +-> advertise model tools +-> build app/connector tools +-> execute MCP calls ``` ## Cache contract The existing MCP extension owns a cache keyed by the full `SelectedCapabilityRoot`: ```rust let state = thread_store.get_or_init(SelectedExecutorPluginMcpState::default); ``` The cache lives with extension thread state. Environment availability filters projection but does not invalidate metadata. Resume creates new thread state. There is no file watcher or executor generation because contents behind a stable environment/root are assumed stable. ## What changes - Keeps executor plugin discovery and cached metadata in `ext/mcp`. - Caches MCP and connector declarations together per selected root. - Uses the step's already-resolved capability roots, including lazy environments that are not turn environments. - Reuses the current MCP runtime when the ready-root projection is unchanged. - Uses the same step MCP manager and connector snapshot for model-visible tools and execution. - Resolves direct thread-scoped MCP requests from the current selected-root projection. ## Deliberately out of scope - `app/list` remains based on the latest global host-plugin state; this PR does not make its response or notifications thread-specific. - `required = true` startup semantics do not apply to delayed executor MCP activation. - No filesystem/content invalidation. - No transport-disconnect watcher. - No executor generations or environment replacement semantics. - No client sharing across complete manager replacements. ## Stack 1. Extension-owned World State sections. 2. Project executor skills through World State. 3. Pin one MCP runtime to each model step. 4. **This PR:** project selected MCP and connector state from extension-owned metadata. 5. Integration coverage for selected capability availability and resume. ## Verification - `selected_plugin_servers_use_managed_requirements_for_the_selected_root_id` - The stacked integration PR covers unavailable to ready activation, unchanged-runtime reuse, skills, MCP tools, connector attribution, and cold resume. --- codex-rs/Cargo.lock | 1 + codex-rs/Cargo.toml | 1 + .../src/request_processors/mcp_processor.rs | 23 +- .../request_processors/thread_processor.rs | 1 - .../app-server/tests/suite/v2/executor_mcp.rs | 1 - codex-rs/core/src/codex_thread.rs | 20 +- codex-rs/core/src/mcp.rs | 45 +++- codex-rs/core/src/prompt_debug.rs | 4 +- codex-rs/core/src/session/mcp.rs | 196 ++++++++++++++---- codex-rs/core/src/session/mcp_runtime.rs | 8 + codex-rs/core/src/session/mod.rs | 19 +- codex-rs/core/src/session/session.rs | 10 +- codex-rs/core/src/session/tests.rs | 3 + codex-rs/core/src/session/turn.rs | 27 +-- codex-rs/core/src/state/service.rs | 14 +- codex-rs/core/src/thread_manager_tests.rs | 24 ++- .../ext/extension-api/src/contributors/mcp.rs | 47 ++++- codex-rs/ext/mcp/Cargo.toml | 1 + codex-rs/ext/mcp/src/executor_plugin.rs | 154 ++++++++++---- codex-rs/ext/mcp/src/lib.rs | 7 - codex-rs/ext/mcp/tests/executor_plugin_mcp.rs | 12 +- 21 files changed, 455 insertions(+), 163 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index b14702282..75c02adfb 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -3359,6 +3359,7 @@ name = "codex-mcp-extension" version = "0.0.0" dependencies = [ "codex-config", + "codex-connectors-extension", "codex-core", "codex-core-plugins", "codex-exec-server", diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index ef17cb9ce..0824f7db7 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -166,6 +166,7 @@ codex-code-mode-protocol = { path = "code-mode-protocol" } codex-home = { path = "codex-home" } codex-config = { path = "config" } codex-connectors = { path = "connectors" } +codex-connectors-extension = { path = "ext/connectors" } codex-context-fragments = { path = "context-fragments" } codex-core = { path = "core" } codex-core-api = { path = "core-api" } diff --git a/codex-rs/app-server/src/request_processors/mcp_processor.rs b/codex-rs/app-server/src/request_processors/mcp_processor.rs index abefd2601..ac2125c2c 100644 --- a/codex-rs/app-server/src/request_processors/mcp_processor.rs +++ b/codex-rs/app-server/src/request_processors/mcp_processor.rs @@ -124,7 +124,7 @@ impl McpRequestProcessor { let (mcp_config, runtime_context) = match thread_id.as_deref() { Some(thread_id) => { let (_, thread) = self.load_thread(thread_id).await?; - let runtime = thread.current_mcp_runtime(); + let runtime = thread.current_mcp_runtime().await; (runtime.config().clone(), runtime.runtime_context().clone()) } None => { @@ -246,14 +246,21 @@ impl McpRequestProcessor { let mcp_manager = self.thread_manager.mcp_manager(); let codex_apps_tools_cache = mcp_manager.codex_apps_tools_cache(); let auth = self.auth_manager.auth().await; - let mcp_config = match thread { - Some(thread) => thread.runtime_mcp_config(&config).await, - None => mcp_manager.runtime_config(&config).await, + let (mcp_config, runtime_context) = match thread { + Some(thread) => { + let mcp_config = thread.runtime_mcp_config(&config).await; + let runtime = thread.current_mcp_runtime().await; + (mcp_config, runtime.runtime_context().clone()) + } + None => { + let mcp_config = mcp_manager.runtime_config(&config).await; + let runtime_context = McpRuntimeContext::new( + self.thread_manager.environment_manager(), + config.cwd.to_path_buf(), + ); + (mcp_config, runtime_context) + } }; - let runtime_context = McpRuntimeContext::new( - self.thread_manager.environment_manager(), - config.cwd.to_path_buf(), - ); tokio::spawn(async move { Self::list_mcp_server_status_task( diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 0fc37d1a4..39283bb35 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -1155,7 +1155,6 @@ impl ThreadRequestProcessor { let mut thread_extension_init = ExtensionDataInit::new(); if !selected_capability_roots.is_empty() { thread_extension_init.insert(selected_capability_roots); - codex_mcp_extension::initialize_executor_plugin_thread_data(&mut thread_extension_init); } let create_thread_started_at = std::time::Instant::now(); let NewThread { diff --git a/codex-rs/app-server/tests/suite/v2/executor_mcp.rs b/codex-rs/app-server/tests/suite/v2/executor_mcp.rs index 67d97bc24..aa650c3ec 100644 --- a/codex-rs/app-server/tests/suite/v2/executor_mcp.rs +++ b/codex-rs/app-server/tests/suite/v2/executor_mcp.rs @@ -191,7 +191,6 @@ HTTP_PROXY = {http_proxy} ) .await?; - std::fs::write(plugin.path().join(".mcp.json"), r#"{"mcpServers":{}}"#)?; let config_path = codex_home.path().join("config.toml"); let mut config = std::fs::read_to_string(&config_path)?; config.push_str(&format!( diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 46b38af15..90555db91 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -595,8 +595,14 @@ impl CodexThread { } /// Returns the exact MCP config, environment bindings, and manager most recently published. - pub fn current_mcp_runtime(&self) -> Arc { - self.codex.session.services.latest_mcp_runtime() + pub async fn current_mcp_runtime(&self) -> Arc { + let turn_context = self.codex.session.new_default_turn().await; + self.codex + .session + .capture_step_context(turn_context) + .await + .mcp + .clone() } pub fn multi_agent_version(&self) -> Option { @@ -620,8 +626,9 @@ impl CodexThread { uri: &str, ) -> anyhow::Result { let result = self - .codex - .session + .current_mcp_runtime() + .await + .manager_arc() .read_resource(server, ReadResourceRequestParams::new(uri)) .await?; @@ -635,8 +642,9 @@ impl CodexThread { arguments: Option, meta: Option, ) -> anyhow::Result { - self.codex - .session + self.current_mcp_runtime() + .await + .manager_arc() .call_tool(server, tool, arguments, meta) .await } diff --git a/codex-rs/core/src/mcp.rs b/codex-rs/core/src/mcp.rs index 658c5471e..45ca0d563 100644 --- a/codex-rs/core/src/mcp.rs +++ b/codex-rs/core/src/mcp.rs @@ -3,7 +3,10 @@ use std::sync::Arc; use crate::config::Config; use codex_config::McpServerConfig; +use codex_connectors::ConnectorSnapshot; +use codex_connectors::PluginConnectorSource; use codex_core_plugins::PluginsManager; +use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionDataInit; use codex_extension_api::ExtensionRegistry; use codex_extension_api::McpServerContribution; @@ -18,6 +21,7 @@ use codex_mcp::McpServerRegistration; use codex_mcp::codex_apps_mcp_server_config; use codex_mcp::configured_mcp_servers; use codex_mcp::effective_mcp_servers; +use codex_plugin::AppConnectorId; const LEGACY_CODEX_APPS_REGISTRATION_ID: &str = "legacy_codex_apps"; @@ -69,28 +73,32 @@ impl McpManager { /// Returns the MCP config after applying compatibility built-ins and /// runtime-only extension overlays. pub async fn runtime_config(&self, config: &Config) -> McpConfig { - self.runtime_config_with_context(config, /*thread_init*/ None) + self.runtime_config_with_context(McpServerContributionContext::global(config)) .await } - pub(crate) async fn runtime_config_for_thread( + pub(crate) async fn runtime_config_for_step( &self, config: &Config, thread_init: &ExtensionDataInit, + thread_store: &ExtensionData, + available_environment_ids: &[String], ) -> McpConfig { - self.runtime_config_with_context(config, Some(thread_init)) - .await + self.runtime_config_with_context(McpServerContributionContext::for_step( + config, + thread_init, + thread_store, + available_environment_ids, + )) + .await } async fn runtime_config_with_context( &self, - config: &Config, - thread_init: Option<&ExtensionDataInit>, + context: McpServerContributionContext<'_, Config>, ) -> McpConfig { - let context = match thread_init { - Some(thread_init) => McpServerContributionContext::for_thread(config, thread_init), - None => McpServerContributionContext::global(config), - }; + let config = context.config(); + let mut selected_plugin_connector_sources = Vec::new(); let mut selected_plugin_registrations = Vec::new(); let mut overlays = Vec::new(); // A contributor can emit multiple ordered actions, so order each action globally rather @@ -121,6 +129,17 @@ impl McpManager { *config, ), ), + McpServerContribution::SelectedPluginConnectors { + plugin_id, + plugin_display_name, + connector_ids, + } => selected_plugin_connector_sources.push( + PluginConnectorSource::from_connector_ids( + plugin_id, + plugin_display_name, + connector_ids.into_iter().map(AppConnectorId), + ), + ), McpServerContribution::Remove { name } => { overlays.push(OrderedMcpOverlay::Remove { contributor_id: contributor.id(), @@ -186,6 +205,12 @@ impl McpManager { ); } mcp_config.mcp_server_catalog = catalog; + mcp_config.connector_snapshot = + mcp_config + .connector_snapshot + .merged_with(&ConnectorSnapshot::from_plugin_sources( + selected_plugin_connector_sources, + )); mcp_config } diff --git a/codex-rs/core/src/prompt_debug.rs b/codex-rs/core/src/prompt_debug.rs index a866116a2..cac9bb41a 100644 --- a/codex-rs/core/src/prompt_debug.rs +++ b/codex-rs/core/src/prompt_debug.rs @@ -64,7 +64,7 @@ pub async fn build_prompt_input( ); let thread = thread_manager.start_thread(config).await?; - let output = build_prompt_input_from_session(thread.thread.codex.session.as_ref(), input).await; + let output = build_prompt_input_from_session(&thread.thread.codex.session, input).await; let shutdown = thread.thread.shutdown_and_wait().await; let _removed = thread_manager.remove_thread(&thread.thread_id).await; @@ -73,7 +73,7 @@ pub async fn build_prompt_input( } pub(crate) async fn build_prompt_input_from_session( - sess: &Session, + sess: &Arc, input: Vec, ) -> CodexResult> { let turn_context = sess.new_default_turn().await; diff --git a/codex-rs/core/src/session/mcp.rs b/codex-rs/core/src/session/mcp.rs index b9e87f0c8..50921f159 100644 --- a/codex-rs/core/src/session/mcp.rs +++ b/codex-rs/core/src/session/mcp.rs @@ -1,7 +1,9 @@ use super::*; +use codex_exec_server::ResolvedSelectedCapabilityRoot; use codex_mcp::ElicitationReviewRequest; use codex_mcp::ElicitationReviewer; use codex_mcp::ElicitationReviewerHandle; +use codex_protocol::capabilities::CapabilityRootLocation; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::mcp_approval_meta::APPROVAL_KIND_KEY as MCP_ELICITATION_APPROVAL_KIND_KEY; use codex_protocol::mcp_approval_meta::APPROVAL_KIND_MCP_TOOL_CALL as MCP_ELICITATION_APPROVAL_KIND_MCP_TOOL_CALL; @@ -75,9 +77,20 @@ impl ElicitationReviewer for GuardianMcpElicitationReviewer { impl Session { pub(crate) async fn runtime_mcp_config(&self, config: &Config) -> McpConfig { + let environments = self.services.turn_environments.snapshot().await; + let selected_capability_roots = self + .resolve_selected_capability_roots_for_step(&environments) + .await; + let available_environment_ids = + Self::available_selected_environment_ids(&selected_capability_roots); self.services .mcp_manager - .runtime_config_for_thread(config, &self.services.mcp_thread_init) + .runtime_config_for_step( + config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) .await } @@ -88,6 +101,62 @@ impl Session { codex_mcp::configured_mcp_servers(&self.runtime_mcp_config(config).await) } + #[expect( + clippy::await_holding_invalid_type, + reason = "MCP runtime comparison and publication must remain serialized" + )] + pub(crate) async fn mcp_runtime_for_step( + self: &Arc, + turn_context: &TurnContext, + environments: &TurnEnvironmentSnapshot, + selected_capability_roots: &[ResolvedSelectedCapabilityRoot], + ) -> Arc { + let available_environment_ids = + Self::available_selected_environment_ids(selected_capability_roots); + let current = self.services.latest_mcp_runtime(); + if current.available_environment_ids() == available_environment_ids { + return current; + } + + let _guard = self.services.mcp_projection_lock.lock().await; + let current = self.services.latest_mcp_runtime(); + if current.available_environment_ids() == available_environment_ids { + return current; + } + let mcp_config = self + .services + .mcp_manager + .runtime_config_for_step( + &turn_context.config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) + .await; + self.refresh_mcp_servers_inner( + turn_context, + mcp_config, + environments, + &available_environment_ids, + Some(self.mcp_elicitation_reviewer()), + ) + .await + } + + pub(crate) async fn resolve_selected_capability_roots_for_step( + &self, + environments: &TurnEnvironmentSnapshot, + ) -> Vec { + self.services + .turn_environments + .environment_manager() + .resolve_selected_capability_roots( + &self.services.selected_capability_roots, + &environments.captured_environments(), + ) + .await + } + pub(crate) fn mcp_elicitation_reviewer(self: &Arc) -> ElicitationReviewerHandle { Arc::new(GuardianMcpElicitationReviewer::new(self)) } @@ -207,51 +276,22 @@ impl Session { .await } - pub async fn read_resource( - &self, - server: &str, - params: ReadResourceRequestParams, - ) -> anyhow::Result { - self.services - .latest_mcp_runtime() - .manager_arc() - .read_resource(server, params) - .await - } - - pub async fn call_tool( - &self, - server: &str, - tool: &str, - arguments: Option, - meta: Option, - ) -> anyhow::Result { - self.services - .latest_mcp_runtime() - .manager_arc() - .call_tool(server, tool, arguments, meta) - .await - } - async fn refresh_mcp_servers_inner( &self, turn_context: &TurnContext, - mut mcp_config: McpConfig, - configured_mcp_servers: HashMap, + mcp_config: McpConfig, + environments: &TurnEnvironmentSnapshot, + available_environment_ids: &[String], elicitation_reviewer: Option, - ) { - mcp_config.mcp_server_catalog = mcp_config - .mcp_server_catalog - .with_materialized_servers(configured_mcp_servers); - let mcp_config = Arc::new(mcp_config); + ) -> Arc { let auth = self.services.auth_manager.auth().await; + let mcp_config = Arc::new(mcp_config); let tool_plugin_provenance = codex_mcp::tool_plugin_provenance(&mcp_config); let mcp_servers = effective_mcp_servers(&mcp_config, auth.as_ref()); let environment_manager = self.services.turn_environments.environment_manager(); // TODO(anp): Migrate MCP runtime cwd plumbing to PathUri so foreign environment cwd // values can be used without falling back to the legacy host cwd. - let cwd = turn_context - .environments + let cwd = environments .primary() .and_then(|turn_environment| turn_environment.cwd().to_abs_path().ok()) .map(|cwd| cwd.to_path_buf()) @@ -305,10 +345,18 @@ impl Session { refreshed_manager .set_elicitations_auto_deny(current_manager.manager().elicitations_auto_deny()); } - self.services - .publish_mcp_runtime(mcp_config, mcp_runtime_context, refreshed_manager); + self.services.publish_mcp_runtime( + mcp_config, + mcp_runtime_context, + available_environment_ids.to_vec(), + refreshed_manager, + ) } + #[expect( + clippy::await_holding_invalid_type, + reason = "MCP runtime refresh and publication must remain serialized" + )] pub(crate) async fn refresh_mcp_servers_if_requested( &self, turn_context: &TurnContext, @@ -365,9 +413,33 @@ impl Session { return; } - let mcp_config = self.runtime_mcp_config(&refresh_config).await; - self.refresh_mcp_servers_inner(turn_context, mcp_config, mcp_servers, elicitation_reviewer) + let _guard = self.services.mcp_projection_lock.lock().await; + let available_environment_ids = self + .services + .latest_mcp_runtime() + .available_environment_ids() + .to_vec(); + let mut mcp_config = self + .services + .mcp_manager + .runtime_config_for_step( + &refresh_config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) .await; + mcp_config.mcp_server_catalog = mcp_config + .mcp_server_catalog + .with_materialized_servers(mcp_servers); + self.refresh_mcp_servers_inner( + turn_context, + mcp_config, + &turn_context.environments, + &available_environment_ids, + elicitation_reviewer, + ) + .await; } pub(crate) async fn set_openai_form_elicitation_support( @@ -398,16 +470,54 @@ impl Session { Ok(()) } + #[expect( + clippy::await_holding_invalid_type, + reason = "MCP runtime refresh and publication must remain serialized" + )] pub(crate) async fn refresh_mcp_servers_now( &self, turn_context: &TurnContext, refresh_config: &Config, elicitation_reviewer: Option, ) { - let mcp_config = self.runtime_mcp_config(refresh_config).await; - let mcp_servers = codex_mcp::configured_mcp_servers(&mcp_config); - self.refresh_mcp_servers_inner(turn_context, mcp_config, mcp_servers, elicitation_reviewer) + let _guard = self.services.mcp_projection_lock.lock().await; + let available_environment_ids = self + .services + .latest_mcp_runtime() + .available_environment_ids() + .to_vec(); + let mcp_config = self + .services + .mcp_manager + .runtime_config_for_step( + refresh_config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) .await; + self.refresh_mcp_servers_inner( + turn_context, + mcp_config, + &turn_context.environments, + &available_environment_ids, + elicitation_reviewer, + ) + .await; + } + + fn available_selected_environment_ids( + selected_capability_roots: &[ResolvedSelectedCapabilityRoot], + ) -> Vec { + let mut available = Vec::new(); + for root in selected_capability_roots { + let CapabilityRootLocation::Environment { environment_id, .. } = + &root.selected_root().location; + if !available.contains(environment_id) { + available.push(environment_id.clone()); + } + } + available } #[cfg(test)] diff --git a/codex-rs/core/src/session/mcp_runtime.rs b/codex-rs/core/src/session/mcp_runtime.rs index dc5ea011c..34cf07cc3 100644 --- a/codex-rs/core/src/session/mcp_runtime.rs +++ b/codex-rs/core/src/session/mcp_runtime.rs @@ -10,6 +10,7 @@ pub struct McpRuntimeSnapshot { config: Arc, manager: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, } impl McpRuntimeSnapshot { @@ -17,11 +18,13 @@ impl McpRuntimeSnapshot { config: Arc, manager: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, ) -> Self { Self { config, manager, runtime_context, + available_environment_ids, } } @@ -41,6 +44,10 @@ impl McpRuntimeSnapshot { &self.runtime_context } + pub(crate) fn available_environment_ids(&self) -> &[String] { + &self.available_environment_ids + } + #[cfg(test)] pub(crate) fn new_uninitialized_for_test(config: &crate::config::Config) -> Arc { use codex_exec_server::EnvironmentManager; @@ -81,6 +88,7 @@ impl McpRuntimeSnapshot { Arc::new(mcp_config), Arc::new(manager), runtime_context, + Vec::new(), )) } } diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 9421e07a6..000825754 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -100,7 +100,6 @@ use codex_protocol::dynamic_tools::DynamicToolResponse; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::items::TurnItem; use codex_protocol::items::UserMessageItem; -use codex_protocol::mcp::CallToolResult; use codex_protocol::models::ActivePermissionProfile; use codex_protocol::models::AdditionalPermissionProfile; use codex_protocol::models::BaseInstructions; @@ -158,8 +157,6 @@ use codex_utils_path_uri::PathUri; use futures::future::BoxFuture; use futures::future::Shared; use futures::prelude::*; -use rmcp::model::ReadResourceRequestParams; -use rmcp::model::ReadResourceResult; use rmcp::model::RequestId; use serde_json::Value; use tokio::sync::Mutex; @@ -2825,7 +2822,7 @@ impl Session { /// `run_turn` and pass the result down; standalone request or history boundaries may capture /// their own step. pub(crate) async fn capture_step_context( - &self, + self: &Arc, turn_context: Arc, ) -> Arc { let deferred_executor_enabled = turn_context @@ -2846,15 +2843,15 @@ impl Session { } let loaded_agents_md = self.services.agents_md_manager.get_loaded().await; let selected_capability_roots = self - .services - .turn_environments - .environment_manager() - .resolve_selected_capability_roots( - &self.services.selected_capability_roots, - &environments.captured_environments(), + .resolve_selected_capability_roots_for_step(&environments) + .await; + let mcp = self + .mcp_runtime_for_step( + turn_context.as_ref(), + &environments, + &selected_capability_roots, ) .await; - let mcp = self.services.latest_mcp_runtime(); Arc::new(StepContext::new( turn_context, environments, diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 2c9e57036..0e1609207 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -661,6 +661,7 @@ impl Session { let config_for_mcp = Arc::clone(&config); let mcp_manager_for_mcp = Arc::clone(&mcp_manager); let mcp_thread_init_for_startup = &mcp_thread_init; + let thread_extension_data_for_mcp = &thread_extension_data; let mcp_runtime_cwd = session_configuration .environment_selections() .first() @@ -673,7 +674,12 @@ impl Session { let auth_and_mcp_fut = async move { let auth = auth_manager_clone.auth().await; let mcp_config = mcp_manager_for_mcp - .runtime_config_for_thread(&config_for_mcp, mcp_thread_init_for_startup) + .runtime_config_for_step( + &config_for_mcp, + mcp_thread_init_for_startup, + thread_extension_data_for_mcp, + /*available_environment_ids*/ &[], + ) .await; let mcp_servers = codex_mcp::effective_mcp_servers(&mcp_config, auth.as_ref()); let tool_plugin_provenance = codex_mcp::tool_plugin_provenance(&mcp_config); @@ -1039,6 +1045,7 @@ impl Session { // setup is straightforward enough and performs well. mcp_connection_manager, mcp_runtime: arc_swap::ArcSwapOption::empty(), + mcp_projection_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -1210,6 +1217,7 @@ impl Session { .install_mcp_connection_manager( Arc::new(mcp_config), mcp_runtime_context, + /*available_environment_ids*/ Vec::new(), mcp_connection_manager, ) .await?; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index d0aac0424..57f1ce908 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -5372,6 +5372,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { let services = SessionServices { mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from(mcp_runtime.manager_arc())), mcp_runtime: arc_swap::ArcSwapOption::from(Some(mcp_runtime)), + mcp_projection_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -7446,6 +7447,7 @@ where let services = SessionServices { mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from(mcp_runtime.manager_arc())), mcp_runtime: arc_swap::ArcSwapOption::from(Some(mcp_runtime)), + mcp_projection_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -7603,6 +7605,7 @@ pub(crate) async fn make_session_and_context_with_rx() -> ( #[tokio::test] async fn refresh_mcp_servers_keeps_the_previous_runtime_alive() { let (session, turn_context) = make_session_and_context().await; + let session = Arc::new(session); let turn_context = Arc::new(turn_context); let old_runtime = session.services.latest_mcp_runtime(); let step_context = session diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 2cb1f2cad..7b509ae42 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -171,8 +171,13 @@ pub(crate) async fn run_turn( .record_context_updates_and_set_reference_context_item(first_step_context.as_ref()) .await; - let Some((injection_items, explicitly_enabled_connectors)) = - build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await + let Some((injection_items, explicitly_enabled_connectors)) = build_skills_and_plugins( + &sess, + first_step_context.as_ref(), + &input, + &cancellation_token, + ) + .await else { return Ok(None); }; @@ -504,10 +509,11 @@ async fn run_hooks_and_record_inputs( #[instrument(level = "trace", skip_all)] async fn build_skills_and_plugins( sess: &Arc, - turn_context: &TurnContext, + step_context: &StepContext, input: &[TurnInput], cancellation_token: &CancellationToken, ) -> Option<(Vec, HashSet)> { + let turn_context = step_context.turn.as_ref(); // Guardian input embeds the parent transcript as untrusted evidence. Do not interpret skill or // plugin mentions from that generated prompt as requests to inject additional instructions. if crate::guardian::is_guardian_reviewer_source(&turn_context.session_source) { @@ -538,17 +544,14 @@ async fn build_skills_and_plugins( // enabled plugins, then converted into turn-scoped guidance below. let mentioned_plugins = collect_explicit_plugin_mentions(&user_input, loaded_plugins.capability_summaries()); - let connector_snapshot = codex_connectors::ConnectorSnapshot::from_plugin_capability_summaries( - loaded_plugins.capability_summaries(), - ); + let connector_snapshot = step_context.mcp.config().connector_snapshot.clone(); let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() { // Plugin mentions need raw MCP/app inventory even when app tools // are normally hidden so we can describe the plugin's currently // usable capabilities for this turn. - match sess - .services - .mcp_connection_manager - .load_full() + match step_context + .mcp + .manager_arc() .list_all_tools() .or_cancel(cancellation_token) .await @@ -1189,9 +1192,7 @@ pub(crate) async fn built_tools( .plugins_for_config(&turn_context.config.plugins_config_input()) .instrument(trace_span!("built_tools.load_plugins")) .await; - let connector_snapshot = codex_connectors::ConnectorSnapshot::from_plugin_capability_summaries( - loaded_plugins.capability_summaries(), - ); + let connector_snapshot = step_context.mcp.config().connector_snapshot.clone(); let apps_enabled = turn_context.apps_enabled(); let accessible_connectors = diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 1bee45cce..a611b9488 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -51,6 +51,8 @@ pub(crate) struct SessionServices { pub(crate) mcp_connection_manager: Arc>, /// The latest atomically published MCP config and manager pair. pub(crate) mcp_runtime: ArcSwapOption, + /// Serializes environment-driven runtime rebuilds. + pub(crate) mcp_projection_lock: Mutex<()>, pub(crate) mcp_startup_cancellation_token: Mutex, pub(crate) unified_exec_manager: UnifiedExecProcessManager, #[cfg_attr(not(unix), allow(dead_code))] @@ -106,9 +108,11 @@ impl SessionServices { &self, config: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, manager: McpConnectionManager, ) -> Result<()> { - let runtime = self.publish_mcp_runtime(config, runtime_context, manager); + let runtime = + self.publish_mcp_runtime(config, runtime_context, available_environment_ids, manager); runtime.manager().validate_required_servers().await } @@ -116,13 +120,19 @@ impl SessionServices { &self, config: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, manager: McpConnectionManager, ) -> Arc { let manager = Arc::new(manager); // Publish the manager for legacy resource clients first. Once the paired snapshot is // visible, every model-scoped consumer observes this exact manager. self.mcp_connection_manager.store(Arc::clone(&manager)); - let runtime = Arc::new(McpRuntimeSnapshot::new(config, manager, runtime_context)); + let runtime = Arc::new(McpRuntimeSnapshot::new( + config, + manager, + runtime_context, + available_environment_ids, + )); self.mcp_runtime.store(Some(Arc::clone(&runtime))); runtime } diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 9bccf8b6c..5148f1625 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -584,8 +584,28 @@ async fn start_thread_seeds_extension_data_for_mcp_and_lifecycle_contributors() }) .await .expect("start second thread"); - let first_resolved = first_thread.thread.runtime_mcp_config(&config).await; - let second_resolved = second_thread.thread.runtime_mcp_config(&config).await; + let first_session = &first_thread.thread.codex.session; + let first_resolved = first_session + .services + .mcp_manager + .runtime_config_for_step( + &config, + &first_session.services.mcp_thread_init, + &first_session.services.thread_extension_data, + /*available_environment_ids*/ &[], + ) + .await; + let second_session = &second_thread.thread.codex.session; + let second_resolved = second_session + .services + .mcp_manager + .runtime_config_for_step( + &config, + &second_session.services.mcp_thread_init, + &second_session.services.thread_extension_data, + /*available_environment_ids*/ &[], + ) + .await; assert_eq!( *lifecycle_observed diff --git a/codex-rs/ext/extension-api/src/contributors/mcp.rs b/codex-rs/ext/extension-api/src/contributors/mcp.rs index 91e9cb459..499657f55 100644 --- a/codex-rs/ext/extension-api/src/contributors/mcp.rs +++ b/codex-rs/ext/extension-api/src/contributors/mcp.rs @@ -1,17 +1,22 @@ use codex_config::McpServerConfig; +use crate::ExtensionData; use crate::ExtensionDataInit; /// Input supplied while resolving MCP server contributions. /// -/// Thread-scoped implementations can read the immutable host-seeded inputs -/// through [`Self::thread_init`]. Implementations should not retain borrowed -/// context after contribution completes. +/// Thread-scoped implementations can read stable host inputs through [`Self::thread_init`] and +/// keep their cache in [`Self::thread_store`]. Implementations should not retain borrowed context +/// after contribution completes. pub struct McpServerContributionContext<'a, C> { /// Host configuration visible during MCP resolution. config: &'a C, - /// Initial inputs for the active thread, when resolution is thread-scoped. + /// Extension-owned data for the active thread, when resolution is thread-scoped. + thread_store: Option<&'a ExtensionData>, + /// Stable host inputs for the active thread, when resolution is thread-scoped. thread_init: Option<&'a ExtensionDataInit>, + /// Environment IDs whose selected roots may contribute to this exact step. + available_environment_ids: Option<&'a [String]>, } impl Clone for McpServerContributionContext<'_, C> { @@ -27,15 +32,24 @@ impl<'a, C> McpServerContributionContext<'a, C> { pub fn global(config: &'a C) -> Self { Self { config, + thread_store: None, thread_init: None, + available_environment_ids: None, } } - /// Creates context for one active thread runtime. - pub fn for_thread(config: &'a C, thread_init: &'a ExtensionDataInit) -> Self { + /// Creates context for one model step using only currently available environments. + pub fn for_step( + config: &'a C, + thread_init: &'a ExtensionDataInit, + thread_store: &'a ExtensionData, + available_environment_ids: &'a [String], + ) -> Self { Self { config, + thread_store: Some(thread_store), thread_init: Some(thread_init), + available_environment_ids: Some(available_environment_ids), } } @@ -44,10 +58,23 @@ impl<'a, C> McpServerContributionContext<'a, C> { self.config } - /// Returns the frozen initial inputs when resolving for a running thread. + /// Returns extension-owned state when resolving for a running thread. + pub fn thread_store(&self) -> Option<&'a ExtensionData> { + self.thread_store + } + + /// Returns stable host inputs when resolving for a running thread. pub fn thread_init(&self) -> Option<&'a ExtensionDataInit> { self.thread_init } + + /// Returns the exact environment availability projection for a model step. + /// + /// `Some` means contributors must omit selected roots whose environment ID is absent from the + /// slice. Global resolution returns `None` because it has no thread environments. + pub fn available_environment_ids(&self) -> Option<&'a [String]> { + self.available_environment_ids + } } /// One extension-owned overlay for the runtime MCP server configuration. @@ -66,6 +93,12 @@ pub enum McpServerContribution { selection_order: usize, config: Box, }, + /// Adds connector IDs declared by a plugin selected for this thread. + SelectedPluginConnectors { + plugin_id: String, + plugin_display_name: String, + connector_ids: Vec, + }, /// Removes a named MCP server. Remove { name: String }, } diff --git a/codex-rs/ext/mcp/Cargo.toml b/codex-rs/ext/mcp/Cargo.toml index b98c0905d..d918bd5bd 100644 --- a/codex-rs/ext/mcp/Cargo.toml +++ b/codex-rs/ext/mcp/Cargo.toml @@ -16,6 +16,7 @@ workspace = true codex-core = { workspace = true } codex-core-plugins = { workspace = true } codex-config = { workspace = true } +codex-connectors-extension = { workspace = true } codex-exec-server = { workspace = true } codex-extension-api = { workspace = true } codex-features = { workspace = true } diff --git a/codex-rs/ext/mcp/src/executor_plugin.rs b/codex-rs/ext/mcp/src/executor_plugin.rs index 69d1a47b3..32637b4f4 100644 --- a/codex-rs/ext/mcp/src/executor_plugin.rs +++ b/codex-rs/ext/mcp/src/executor_plugin.rs @@ -1,44 +1,47 @@ +use codex_connectors_extension::ExecutorPluginConnectorProvider; use codex_core::config::Config; use codex_core_plugins::ExecutorPluginProvider; use codex_exec_server::EnvironmentManager; -use codex_extension_api::ExtensionDataInit; use codex_extension_api::ExtensionFuture; use codex_extension_api::McpServerContribution; use codex_extension_api::McpServerContributionContext; use codex_extension_api::McpServerContributor; +use codex_protocol::capabilities::CapabilityRootLocation; use codex_protocol::capabilities::SelectedCapabilityRoot; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::OnceCell; +use std::sync::Mutex; use self::provider::ExecutorPluginMcpProvider; mod provider; -/// Frozen MCP declarations for one selected package. +/// Frozen MCP and connector declarations for one selected package. /// /// Each server config retains the stable logical environment ID. Reconnection may replace the /// concrete environment instance without changing that authority. #[derive(Clone)] -struct SelectedPluginMcpServers { +struct SelectedPluginMetadata { plugin_id: String, plugin_display_name: String, - selection_order: usize, servers: Vec<(String, codex_config::McpServerConfig)>, + connector_ids: Vec, } #[derive(Default)] pub(crate) struct SelectedExecutorPluginMcpState { - snapshot: OnceCell>, + cache: Mutex>, } -pub(crate) fn seed_thread_state(thread_init: &mut ExtensionDataInit) { - thread_init.insert(SelectedExecutorPluginMcpState::default()); +struct CachedSelectedRoot { + root: SelectedCapabilityRoot, + metadata: Option, } pub(crate) struct SelectedExecutorPluginMcpContributor { plugin_provider: ExecutorPluginProvider, mcp_provider: ExecutorPluginMcpProvider, + connector_provider: ExecutorPluginConnectorProvider, } impl SelectedExecutorPluginMcpContributor { @@ -46,46 +49,87 @@ impl SelectedExecutorPluginMcpContributor { Self { plugin_provider: ExecutorPluginProvider::new(Arc::clone(&environment_manager)), mcp_provider: ExecutorPluginMcpProvider, + connector_provider: ExecutorPluginConnectorProvider, } } - async fn resolve_snapshot( + /// Returns metadata for one stable selected root. + /// + /// Successful resolution, including a root that is not a plugin or declares no capabilities, + /// is cached until the thread state is dropped. Environment availability never invalidates + /// this cache; it only controls whether the cached metadata is projected into a model step. + async fn metadata_for_root( &self, - selected_roots: &[SelectedCapabilityRoot], - ) -> Vec { - let mut snapshot = Vec::new(); + state: &SelectedExecutorPluginMcpState, + selected_root: &SelectedCapabilityRoot, + ) -> Option { + if let Some(cached) = state + .cache + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .iter() + .find(|cached| cached.root == *selected_root) + { + return cached.metadata.clone(); + } - for (selection_order, selected_root) in selected_roots.iter().enumerate() { - let plugin = match self.plugin_provider.resolve_bound(selected_root).await { - Ok(Some(plugin)) => plugin, - Ok(None) => continue, - Err(err) => { - tracing::warn!( - selected_root = selected_root.id, - error = %err, - "failed to resolve selected executor plugin for MCP discovery" - ); - continue; - } - }; - match self.mcp_provider.load(&plugin).await { - Ok(servers) => snapshot.push(SelectedPluginMcpServers { - plugin_id: plugin.plugin().selected_root_id().to_string(), - plugin_display_name: plugin.plugin().manifest().display_name().to_string(), - selection_order, - servers, - }), - Err(err) => { + let plugin = match self.plugin_provider.resolve_bound(selected_root).await { + Ok(plugin) => plugin, + Err(err) => { + tracing::warn!( + selected_root = selected_root.id, + error = %err, + "failed to resolve selected executor plugin" + ); + return None; + } + }; + let metadata = match plugin { + Some(plugin) => { + let servers = self.mcp_provider.load(&plugin).await.unwrap_or_else(|err| { tracing::warn!( selected_root = selected_root.id, error = %err, "failed to load selected executor plugin MCP servers" ); - } + Vec::new() + }); + let connector_ids = self + .connector_provider + .load(&plugin) + .await + .unwrap_or_else(|err| { + tracing::warn!( + selected_root = selected_root.id, + error = %err, + "failed to load selected executor plugin connectors" + ); + Vec::new() + }) + .into_iter() + .map(|declaration| declaration.connector_id.0) + .collect(); + Some(SelectedPluginMetadata { + plugin_id: plugin.plugin().selected_root_id().to_string(), + plugin_display_name: plugin.plugin().manifest().display_name().to_string(), + servers, + connector_ids, + }) } + None => None, + }; + let mut cache = state + .cache + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if let Some(cached) = cache.iter().find(|cached| cached.root == *selected_root) { + return cached.metadata.clone(); } - - snapshot + cache.push(CachedSelectedRoot { + root: selected_root.clone(), + metadata: metadata.clone(), + }); + metadata } } @@ -102,20 +146,31 @@ impl McpServerContributor for SelectedExecutorPluginMcpContributor { let Some(thread_init) = context.thread_init() else { return Vec::new(); }; + let Some(thread_store) = context.thread_store() else { + return Vec::new(); + }; let Some(selected_roots) = thread_init.get::>() else { return Vec::new(); }; - let Some(state) = thread_init.get::() else { - tracing::warn!("selected executor plugin MCP state was not initialized"); - return Vec::new(); - }; - let snapshot = state - .snapshot - .get_or_init(|| self.resolve_snapshot(selected_roots.as_ref())) - .await; + let state = thread_store.get_or_init(SelectedExecutorPluginMcpState::default); let mut contributions = Vec::new(); - for plugin in snapshot { + for (selection_order, selected_root) in selected_roots.iter().enumerate() { + let CapabilityRootLocation::Environment { environment_id, .. } = + &selected_root.location; + if context + .available_environment_ids() + .is_some_and(|available| { + !available + .iter() + .any(|available| available == environment_id) + }) + { + continue; + } + let Some(plugin) = self.metadata_for_root(&state, selected_root).await else { + continue; + }; let mut servers = plugin.servers.iter().cloned().collect::>(); context .config() @@ -127,10 +182,17 @@ impl McpServerContributor for SelectedExecutorPluginMcpContributor { name, plugin_id: plugin.plugin_id.clone(), plugin_display_name: plugin.plugin_display_name.clone(), - selection_order: plugin.selection_order, + selection_order, config: Box::new(config), } })); + if !plugin.connector_ids.is_empty() { + contributions.push(McpServerContribution::SelectedPluginConnectors { + plugin_id: plugin.plugin_id, + plugin_display_name: plugin.plugin_display_name, + connector_ids: plugin.connector_ids, + }); + } } contributions diff --git a/codex-rs/ext/mcp/src/lib.rs b/codex-rs/ext/mcp/src/lib.rs index e32316ac3..9c22bad5a 100644 --- a/codex-rs/ext/mcp/src/lib.rs +++ b/codex-rs/ext/mcp/src/lib.rs @@ -51,10 +51,3 @@ pub fn install_executor_plugins( executor_plugin::SelectedExecutorPluginMcpContributor::new(environment_manager), )); } - -/// Seeds the per-thread snapshot used by selected executor plugin MCP discovery. -pub fn initialize_executor_plugin_thread_data( - thread_init: &mut codex_extension_api::ExtensionDataInit, -) { - executor_plugin::seed_thread_state(thread_init); -} diff --git a/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs b/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs index 7bdbc83d6..65a4248a3 100644 --- a/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs +++ b/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs @@ -3,6 +3,7 @@ use codex_core::config::Config; use codex_core::config::ConfigBuilder; use codex_exec_server::EnvironmentManager; use codex_exec_server::LOCAL_ENVIRONMENT_ID; +use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionDataInit; use codex_extension_api::ExtensionRegistryBuilder; use codex_extension_api::McpServerContribution; @@ -109,12 +110,15 @@ async fn selected_plugin_contributions( path: PathUri::from_host_native_path(plugin_root)?, }, }]); - codex_mcp_extension::initialize_executor_plugin_thread_data(&mut thread_init); + let thread_store = ExtensionData::new_with_init("test-thread", thread_init.clone()); + let available_environment_ids = vec![LOCAL_ENVIRONMENT_ID.to_string()]; Ok(registry.mcp_server_contributors()[0] - .contribute(McpServerContributionContext::for_thread( + .contribute(McpServerContributionContext::for_step( config, &thread_init, + &thread_store, + &available_environment_ids, )) .await .into_iter() @@ -132,7 +136,9 @@ async fn selected_plugin_contributions( selection_order, enabled: config.enabled, }, - McpServerContribution::Set { .. } | McpServerContribution::Remove { .. } => { + McpServerContribution::Set { .. } + | McpServerContribution::SelectedPluginConnectors { .. } + | McpServerContribution::Remove { .. } => { panic!("expected selected plugin contribution") } })