diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index b14702282..75c02adfb 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -3359,6 +3359,7 @@ name = "codex-mcp-extension" version = "0.0.0" dependencies = [ "codex-config", + "codex-connectors-extension", "codex-core", "codex-core-plugins", "codex-exec-server", diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index ef17cb9ce..0824f7db7 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -166,6 +166,7 @@ codex-code-mode-protocol = { path = "code-mode-protocol" } codex-home = { path = "codex-home" } codex-config = { path = "config" } codex-connectors = { path = "connectors" } +codex-connectors-extension = { path = "ext/connectors" } codex-context-fragments = { path = "context-fragments" } codex-core = { path = "core" } codex-core-api = { path = "core-api" } diff --git a/codex-rs/app-server/src/request_processors/mcp_processor.rs b/codex-rs/app-server/src/request_processors/mcp_processor.rs index abefd2601..ac2125c2c 100644 --- a/codex-rs/app-server/src/request_processors/mcp_processor.rs +++ b/codex-rs/app-server/src/request_processors/mcp_processor.rs @@ -124,7 +124,7 @@ impl McpRequestProcessor { let (mcp_config, runtime_context) = match thread_id.as_deref() { Some(thread_id) => { let (_, thread) = self.load_thread(thread_id).await?; - let runtime = thread.current_mcp_runtime(); + let runtime = thread.current_mcp_runtime().await; (runtime.config().clone(), runtime.runtime_context().clone()) } None => { @@ -246,14 +246,21 @@ impl McpRequestProcessor { let mcp_manager = self.thread_manager.mcp_manager(); let codex_apps_tools_cache = mcp_manager.codex_apps_tools_cache(); let auth = self.auth_manager.auth().await; - let mcp_config = match thread { - Some(thread) => thread.runtime_mcp_config(&config).await, - None => mcp_manager.runtime_config(&config).await, + let (mcp_config, runtime_context) = match thread { + Some(thread) => { + let mcp_config = thread.runtime_mcp_config(&config).await; + let runtime = thread.current_mcp_runtime().await; + (mcp_config, runtime.runtime_context().clone()) + } + None => { + let mcp_config = mcp_manager.runtime_config(&config).await; + let runtime_context = McpRuntimeContext::new( + self.thread_manager.environment_manager(), + config.cwd.to_path_buf(), + ); + (mcp_config, runtime_context) + } }; - let runtime_context = McpRuntimeContext::new( - self.thread_manager.environment_manager(), - config.cwd.to_path_buf(), - ); tokio::spawn(async move { Self::list_mcp_server_status_task( diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 0fc37d1a4..39283bb35 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -1155,7 +1155,6 @@ impl ThreadRequestProcessor { let mut thread_extension_init = ExtensionDataInit::new(); if !selected_capability_roots.is_empty() { thread_extension_init.insert(selected_capability_roots); - codex_mcp_extension::initialize_executor_plugin_thread_data(&mut thread_extension_init); } let create_thread_started_at = std::time::Instant::now(); let NewThread { diff --git a/codex-rs/app-server/tests/suite/v2/executor_mcp.rs b/codex-rs/app-server/tests/suite/v2/executor_mcp.rs index 67d97bc24..aa650c3ec 100644 --- a/codex-rs/app-server/tests/suite/v2/executor_mcp.rs +++ b/codex-rs/app-server/tests/suite/v2/executor_mcp.rs @@ -191,7 +191,6 @@ HTTP_PROXY = {http_proxy} ) .await?; - std::fs::write(plugin.path().join(".mcp.json"), r#"{"mcpServers":{}}"#)?; let config_path = codex_home.path().join("config.toml"); let mut config = std::fs::read_to_string(&config_path)?; config.push_str(&format!( diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 46b38af15..90555db91 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -595,8 +595,14 @@ impl CodexThread { } /// Returns the exact MCP config, environment bindings, and manager most recently published. - pub fn current_mcp_runtime(&self) -> Arc { - self.codex.session.services.latest_mcp_runtime() + pub async fn current_mcp_runtime(&self) -> Arc { + let turn_context = self.codex.session.new_default_turn().await; + self.codex + .session + .capture_step_context(turn_context) + .await + .mcp + .clone() } pub fn multi_agent_version(&self) -> Option { @@ -620,8 +626,9 @@ impl CodexThread { uri: &str, ) -> anyhow::Result { let result = self - .codex - .session + .current_mcp_runtime() + .await + .manager_arc() .read_resource(server, ReadResourceRequestParams::new(uri)) .await?; @@ -635,8 +642,9 @@ impl CodexThread { arguments: Option, meta: Option, ) -> anyhow::Result { - self.codex - .session + self.current_mcp_runtime() + .await + .manager_arc() .call_tool(server, tool, arguments, meta) .await } diff --git a/codex-rs/core/src/mcp.rs b/codex-rs/core/src/mcp.rs index 658c5471e..45ca0d563 100644 --- a/codex-rs/core/src/mcp.rs +++ b/codex-rs/core/src/mcp.rs @@ -3,7 +3,10 @@ use std::sync::Arc; use crate::config::Config; use codex_config::McpServerConfig; +use codex_connectors::ConnectorSnapshot; +use codex_connectors::PluginConnectorSource; use codex_core_plugins::PluginsManager; +use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionDataInit; use codex_extension_api::ExtensionRegistry; use codex_extension_api::McpServerContribution; @@ -18,6 +21,7 @@ use codex_mcp::McpServerRegistration; use codex_mcp::codex_apps_mcp_server_config; use codex_mcp::configured_mcp_servers; use codex_mcp::effective_mcp_servers; +use codex_plugin::AppConnectorId; const LEGACY_CODEX_APPS_REGISTRATION_ID: &str = "legacy_codex_apps"; @@ -69,28 +73,32 @@ impl McpManager { /// Returns the MCP config after applying compatibility built-ins and /// runtime-only extension overlays. pub async fn runtime_config(&self, config: &Config) -> McpConfig { - self.runtime_config_with_context(config, /*thread_init*/ None) + self.runtime_config_with_context(McpServerContributionContext::global(config)) .await } - pub(crate) async fn runtime_config_for_thread( + pub(crate) async fn runtime_config_for_step( &self, config: &Config, thread_init: &ExtensionDataInit, + thread_store: &ExtensionData, + available_environment_ids: &[String], ) -> McpConfig { - self.runtime_config_with_context(config, Some(thread_init)) - .await + self.runtime_config_with_context(McpServerContributionContext::for_step( + config, + thread_init, + thread_store, + available_environment_ids, + )) + .await } async fn runtime_config_with_context( &self, - config: &Config, - thread_init: Option<&ExtensionDataInit>, + context: McpServerContributionContext<'_, Config>, ) -> McpConfig { - let context = match thread_init { - Some(thread_init) => McpServerContributionContext::for_thread(config, thread_init), - None => McpServerContributionContext::global(config), - }; + let config = context.config(); + let mut selected_plugin_connector_sources = Vec::new(); let mut selected_plugin_registrations = Vec::new(); let mut overlays = Vec::new(); // A contributor can emit multiple ordered actions, so order each action globally rather @@ -121,6 +129,17 @@ impl McpManager { *config, ), ), + McpServerContribution::SelectedPluginConnectors { + plugin_id, + plugin_display_name, + connector_ids, + } => selected_plugin_connector_sources.push( + PluginConnectorSource::from_connector_ids( + plugin_id, + plugin_display_name, + connector_ids.into_iter().map(AppConnectorId), + ), + ), McpServerContribution::Remove { name } => { overlays.push(OrderedMcpOverlay::Remove { contributor_id: contributor.id(), @@ -186,6 +205,12 @@ impl McpManager { ); } mcp_config.mcp_server_catalog = catalog; + mcp_config.connector_snapshot = + mcp_config + .connector_snapshot + .merged_with(&ConnectorSnapshot::from_plugin_sources( + selected_plugin_connector_sources, + )); mcp_config } diff --git a/codex-rs/core/src/prompt_debug.rs b/codex-rs/core/src/prompt_debug.rs index a866116a2..cac9bb41a 100644 --- a/codex-rs/core/src/prompt_debug.rs +++ b/codex-rs/core/src/prompt_debug.rs @@ -64,7 +64,7 @@ pub async fn build_prompt_input( ); let thread = thread_manager.start_thread(config).await?; - let output = build_prompt_input_from_session(thread.thread.codex.session.as_ref(), input).await; + let output = build_prompt_input_from_session(&thread.thread.codex.session, input).await; let shutdown = thread.thread.shutdown_and_wait().await; let _removed = thread_manager.remove_thread(&thread.thread_id).await; @@ -73,7 +73,7 @@ pub async fn build_prompt_input( } pub(crate) async fn build_prompt_input_from_session( - sess: &Session, + sess: &Arc, input: Vec, ) -> CodexResult> { let turn_context = sess.new_default_turn().await; diff --git a/codex-rs/core/src/session/mcp.rs b/codex-rs/core/src/session/mcp.rs index b9e87f0c8..50921f159 100644 --- a/codex-rs/core/src/session/mcp.rs +++ b/codex-rs/core/src/session/mcp.rs @@ -1,7 +1,9 @@ use super::*; +use codex_exec_server::ResolvedSelectedCapabilityRoot; use codex_mcp::ElicitationReviewRequest; use codex_mcp::ElicitationReviewer; use codex_mcp::ElicitationReviewerHandle; +use codex_protocol::capabilities::CapabilityRootLocation; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::mcp_approval_meta::APPROVAL_KIND_KEY as MCP_ELICITATION_APPROVAL_KIND_KEY; use codex_protocol::mcp_approval_meta::APPROVAL_KIND_MCP_TOOL_CALL as MCP_ELICITATION_APPROVAL_KIND_MCP_TOOL_CALL; @@ -75,9 +77,20 @@ impl ElicitationReviewer for GuardianMcpElicitationReviewer { impl Session { pub(crate) async fn runtime_mcp_config(&self, config: &Config) -> McpConfig { + let environments = self.services.turn_environments.snapshot().await; + let selected_capability_roots = self + .resolve_selected_capability_roots_for_step(&environments) + .await; + let available_environment_ids = + Self::available_selected_environment_ids(&selected_capability_roots); self.services .mcp_manager - .runtime_config_for_thread(config, &self.services.mcp_thread_init) + .runtime_config_for_step( + config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) .await } @@ -88,6 +101,62 @@ impl Session { codex_mcp::configured_mcp_servers(&self.runtime_mcp_config(config).await) } + #[expect( + clippy::await_holding_invalid_type, + reason = "MCP runtime comparison and publication must remain serialized" + )] + pub(crate) async fn mcp_runtime_for_step( + self: &Arc, + turn_context: &TurnContext, + environments: &TurnEnvironmentSnapshot, + selected_capability_roots: &[ResolvedSelectedCapabilityRoot], + ) -> Arc { + let available_environment_ids = + Self::available_selected_environment_ids(selected_capability_roots); + let current = self.services.latest_mcp_runtime(); + if current.available_environment_ids() == available_environment_ids { + return current; + } + + let _guard = self.services.mcp_projection_lock.lock().await; + let current = self.services.latest_mcp_runtime(); + if current.available_environment_ids() == available_environment_ids { + return current; + } + let mcp_config = self + .services + .mcp_manager + .runtime_config_for_step( + &turn_context.config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) + .await; + self.refresh_mcp_servers_inner( + turn_context, + mcp_config, + environments, + &available_environment_ids, + Some(self.mcp_elicitation_reviewer()), + ) + .await + } + + pub(crate) async fn resolve_selected_capability_roots_for_step( + &self, + environments: &TurnEnvironmentSnapshot, + ) -> Vec { + self.services + .turn_environments + .environment_manager() + .resolve_selected_capability_roots( + &self.services.selected_capability_roots, + &environments.captured_environments(), + ) + .await + } + pub(crate) fn mcp_elicitation_reviewer(self: &Arc) -> ElicitationReviewerHandle { Arc::new(GuardianMcpElicitationReviewer::new(self)) } @@ -207,51 +276,22 @@ impl Session { .await } - pub async fn read_resource( - &self, - server: &str, - params: ReadResourceRequestParams, - ) -> anyhow::Result { - self.services - .latest_mcp_runtime() - .manager_arc() - .read_resource(server, params) - .await - } - - pub async fn call_tool( - &self, - server: &str, - tool: &str, - arguments: Option, - meta: Option, - ) -> anyhow::Result { - self.services - .latest_mcp_runtime() - .manager_arc() - .call_tool(server, tool, arguments, meta) - .await - } - async fn refresh_mcp_servers_inner( &self, turn_context: &TurnContext, - mut mcp_config: McpConfig, - configured_mcp_servers: HashMap, + mcp_config: McpConfig, + environments: &TurnEnvironmentSnapshot, + available_environment_ids: &[String], elicitation_reviewer: Option, - ) { - mcp_config.mcp_server_catalog = mcp_config - .mcp_server_catalog - .with_materialized_servers(configured_mcp_servers); - let mcp_config = Arc::new(mcp_config); + ) -> Arc { let auth = self.services.auth_manager.auth().await; + let mcp_config = Arc::new(mcp_config); let tool_plugin_provenance = codex_mcp::tool_plugin_provenance(&mcp_config); let mcp_servers = effective_mcp_servers(&mcp_config, auth.as_ref()); let environment_manager = self.services.turn_environments.environment_manager(); // TODO(anp): Migrate MCP runtime cwd plumbing to PathUri so foreign environment cwd // values can be used without falling back to the legacy host cwd. - let cwd = turn_context - .environments + let cwd = environments .primary() .and_then(|turn_environment| turn_environment.cwd().to_abs_path().ok()) .map(|cwd| cwd.to_path_buf()) @@ -305,10 +345,18 @@ impl Session { refreshed_manager .set_elicitations_auto_deny(current_manager.manager().elicitations_auto_deny()); } - self.services - .publish_mcp_runtime(mcp_config, mcp_runtime_context, refreshed_manager); + self.services.publish_mcp_runtime( + mcp_config, + mcp_runtime_context, + available_environment_ids.to_vec(), + refreshed_manager, + ) } + #[expect( + clippy::await_holding_invalid_type, + reason = "MCP runtime refresh and publication must remain serialized" + )] pub(crate) async fn refresh_mcp_servers_if_requested( &self, turn_context: &TurnContext, @@ -365,9 +413,33 @@ impl Session { return; } - let mcp_config = self.runtime_mcp_config(&refresh_config).await; - self.refresh_mcp_servers_inner(turn_context, mcp_config, mcp_servers, elicitation_reviewer) + let _guard = self.services.mcp_projection_lock.lock().await; + let available_environment_ids = self + .services + .latest_mcp_runtime() + .available_environment_ids() + .to_vec(); + let mut mcp_config = self + .services + .mcp_manager + .runtime_config_for_step( + &refresh_config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) .await; + mcp_config.mcp_server_catalog = mcp_config + .mcp_server_catalog + .with_materialized_servers(mcp_servers); + self.refresh_mcp_servers_inner( + turn_context, + mcp_config, + &turn_context.environments, + &available_environment_ids, + elicitation_reviewer, + ) + .await; } pub(crate) async fn set_openai_form_elicitation_support( @@ -398,16 +470,54 @@ impl Session { Ok(()) } + #[expect( + clippy::await_holding_invalid_type, + reason = "MCP runtime refresh and publication must remain serialized" + )] pub(crate) async fn refresh_mcp_servers_now( &self, turn_context: &TurnContext, refresh_config: &Config, elicitation_reviewer: Option, ) { - let mcp_config = self.runtime_mcp_config(refresh_config).await; - let mcp_servers = codex_mcp::configured_mcp_servers(&mcp_config); - self.refresh_mcp_servers_inner(turn_context, mcp_config, mcp_servers, elicitation_reviewer) + let _guard = self.services.mcp_projection_lock.lock().await; + let available_environment_ids = self + .services + .latest_mcp_runtime() + .available_environment_ids() + .to_vec(); + let mcp_config = self + .services + .mcp_manager + .runtime_config_for_step( + refresh_config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) .await; + self.refresh_mcp_servers_inner( + turn_context, + mcp_config, + &turn_context.environments, + &available_environment_ids, + elicitation_reviewer, + ) + .await; + } + + fn available_selected_environment_ids( + selected_capability_roots: &[ResolvedSelectedCapabilityRoot], + ) -> Vec { + let mut available = Vec::new(); + for root in selected_capability_roots { + let CapabilityRootLocation::Environment { environment_id, .. } = + &root.selected_root().location; + if !available.contains(environment_id) { + available.push(environment_id.clone()); + } + } + available } #[cfg(test)] diff --git a/codex-rs/core/src/session/mcp_runtime.rs b/codex-rs/core/src/session/mcp_runtime.rs index dc5ea011c..34cf07cc3 100644 --- a/codex-rs/core/src/session/mcp_runtime.rs +++ b/codex-rs/core/src/session/mcp_runtime.rs @@ -10,6 +10,7 @@ pub struct McpRuntimeSnapshot { config: Arc, manager: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, } impl McpRuntimeSnapshot { @@ -17,11 +18,13 @@ impl McpRuntimeSnapshot { config: Arc, manager: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, ) -> Self { Self { config, manager, runtime_context, + available_environment_ids, } } @@ -41,6 +44,10 @@ impl McpRuntimeSnapshot { &self.runtime_context } + pub(crate) fn available_environment_ids(&self) -> &[String] { + &self.available_environment_ids + } + #[cfg(test)] pub(crate) fn new_uninitialized_for_test(config: &crate::config::Config) -> Arc { use codex_exec_server::EnvironmentManager; @@ -81,6 +88,7 @@ impl McpRuntimeSnapshot { Arc::new(mcp_config), Arc::new(manager), runtime_context, + Vec::new(), )) } } diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 9421e07a6..000825754 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -100,7 +100,6 @@ use codex_protocol::dynamic_tools::DynamicToolResponse; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::items::TurnItem; use codex_protocol::items::UserMessageItem; -use codex_protocol::mcp::CallToolResult; use codex_protocol::models::ActivePermissionProfile; use codex_protocol::models::AdditionalPermissionProfile; use codex_protocol::models::BaseInstructions; @@ -158,8 +157,6 @@ use codex_utils_path_uri::PathUri; use futures::future::BoxFuture; use futures::future::Shared; use futures::prelude::*; -use rmcp::model::ReadResourceRequestParams; -use rmcp::model::ReadResourceResult; use rmcp::model::RequestId; use serde_json::Value; use tokio::sync::Mutex; @@ -2825,7 +2822,7 @@ impl Session { /// `run_turn` and pass the result down; standalone request or history boundaries may capture /// their own step. pub(crate) async fn capture_step_context( - &self, + self: &Arc, turn_context: Arc, ) -> Arc { let deferred_executor_enabled = turn_context @@ -2846,15 +2843,15 @@ impl Session { } let loaded_agents_md = self.services.agents_md_manager.get_loaded().await; let selected_capability_roots = self - .services - .turn_environments - .environment_manager() - .resolve_selected_capability_roots( - &self.services.selected_capability_roots, - &environments.captured_environments(), + .resolve_selected_capability_roots_for_step(&environments) + .await; + let mcp = self + .mcp_runtime_for_step( + turn_context.as_ref(), + &environments, + &selected_capability_roots, ) .await; - let mcp = self.services.latest_mcp_runtime(); Arc::new(StepContext::new( turn_context, environments, diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 2c9e57036..0e1609207 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -661,6 +661,7 @@ impl Session { let config_for_mcp = Arc::clone(&config); let mcp_manager_for_mcp = Arc::clone(&mcp_manager); let mcp_thread_init_for_startup = &mcp_thread_init; + let thread_extension_data_for_mcp = &thread_extension_data; let mcp_runtime_cwd = session_configuration .environment_selections() .first() @@ -673,7 +674,12 @@ impl Session { let auth_and_mcp_fut = async move { let auth = auth_manager_clone.auth().await; let mcp_config = mcp_manager_for_mcp - .runtime_config_for_thread(&config_for_mcp, mcp_thread_init_for_startup) + .runtime_config_for_step( + &config_for_mcp, + mcp_thread_init_for_startup, + thread_extension_data_for_mcp, + /*available_environment_ids*/ &[], + ) .await; let mcp_servers = codex_mcp::effective_mcp_servers(&mcp_config, auth.as_ref()); let tool_plugin_provenance = codex_mcp::tool_plugin_provenance(&mcp_config); @@ -1039,6 +1045,7 @@ impl Session { // setup is straightforward enough and performs well. mcp_connection_manager, mcp_runtime: arc_swap::ArcSwapOption::empty(), + mcp_projection_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -1210,6 +1217,7 @@ impl Session { .install_mcp_connection_manager( Arc::new(mcp_config), mcp_runtime_context, + /*available_environment_ids*/ Vec::new(), mcp_connection_manager, ) .await?; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index d0aac0424..57f1ce908 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -5372,6 +5372,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { let services = SessionServices { mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from(mcp_runtime.manager_arc())), mcp_runtime: arc_swap::ArcSwapOption::from(Some(mcp_runtime)), + mcp_projection_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -7446,6 +7447,7 @@ where let services = SessionServices { mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from(mcp_runtime.manager_arc())), mcp_runtime: arc_swap::ArcSwapOption::from(Some(mcp_runtime)), + mcp_projection_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -7603,6 +7605,7 @@ pub(crate) async fn make_session_and_context_with_rx() -> ( #[tokio::test] async fn refresh_mcp_servers_keeps_the_previous_runtime_alive() { let (session, turn_context) = make_session_and_context().await; + let session = Arc::new(session); let turn_context = Arc::new(turn_context); let old_runtime = session.services.latest_mcp_runtime(); let step_context = session diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 2cb1f2cad..7b509ae42 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -171,8 +171,13 @@ pub(crate) async fn run_turn( .record_context_updates_and_set_reference_context_item(first_step_context.as_ref()) .await; - let Some((injection_items, explicitly_enabled_connectors)) = - build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await + let Some((injection_items, explicitly_enabled_connectors)) = build_skills_and_plugins( + &sess, + first_step_context.as_ref(), + &input, + &cancellation_token, + ) + .await else { return Ok(None); }; @@ -504,10 +509,11 @@ async fn run_hooks_and_record_inputs( #[instrument(level = "trace", skip_all)] async fn build_skills_and_plugins( sess: &Arc, - turn_context: &TurnContext, + step_context: &StepContext, input: &[TurnInput], cancellation_token: &CancellationToken, ) -> Option<(Vec, HashSet)> { + let turn_context = step_context.turn.as_ref(); // Guardian input embeds the parent transcript as untrusted evidence. Do not interpret skill or // plugin mentions from that generated prompt as requests to inject additional instructions. if crate::guardian::is_guardian_reviewer_source(&turn_context.session_source) { @@ -538,17 +544,14 @@ async fn build_skills_and_plugins( // enabled plugins, then converted into turn-scoped guidance below. let mentioned_plugins = collect_explicit_plugin_mentions(&user_input, loaded_plugins.capability_summaries()); - let connector_snapshot = codex_connectors::ConnectorSnapshot::from_plugin_capability_summaries( - loaded_plugins.capability_summaries(), - ); + let connector_snapshot = step_context.mcp.config().connector_snapshot.clone(); let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() { // Plugin mentions need raw MCP/app inventory even when app tools // are normally hidden so we can describe the plugin's currently // usable capabilities for this turn. - match sess - .services - .mcp_connection_manager - .load_full() + match step_context + .mcp + .manager_arc() .list_all_tools() .or_cancel(cancellation_token) .await @@ -1189,9 +1192,7 @@ pub(crate) async fn built_tools( .plugins_for_config(&turn_context.config.plugins_config_input()) .instrument(trace_span!("built_tools.load_plugins")) .await; - let connector_snapshot = codex_connectors::ConnectorSnapshot::from_plugin_capability_summaries( - loaded_plugins.capability_summaries(), - ); + let connector_snapshot = step_context.mcp.config().connector_snapshot.clone(); let apps_enabled = turn_context.apps_enabled(); let accessible_connectors = diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 1bee45cce..a611b9488 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -51,6 +51,8 @@ pub(crate) struct SessionServices { pub(crate) mcp_connection_manager: Arc>, /// The latest atomically published MCP config and manager pair. pub(crate) mcp_runtime: ArcSwapOption, + /// Serializes environment-driven runtime rebuilds. + pub(crate) mcp_projection_lock: Mutex<()>, pub(crate) mcp_startup_cancellation_token: Mutex, pub(crate) unified_exec_manager: UnifiedExecProcessManager, #[cfg_attr(not(unix), allow(dead_code))] @@ -106,9 +108,11 @@ impl SessionServices { &self, config: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, manager: McpConnectionManager, ) -> Result<()> { - let runtime = self.publish_mcp_runtime(config, runtime_context, manager); + let runtime = + self.publish_mcp_runtime(config, runtime_context, available_environment_ids, manager); runtime.manager().validate_required_servers().await } @@ -116,13 +120,19 @@ impl SessionServices { &self, config: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, manager: McpConnectionManager, ) -> Arc { let manager = Arc::new(manager); // Publish the manager for legacy resource clients first. Once the paired snapshot is // visible, every model-scoped consumer observes this exact manager. self.mcp_connection_manager.store(Arc::clone(&manager)); - let runtime = Arc::new(McpRuntimeSnapshot::new(config, manager, runtime_context)); + let runtime = Arc::new(McpRuntimeSnapshot::new( + config, + manager, + runtime_context, + available_environment_ids, + )); self.mcp_runtime.store(Some(Arc::clone(&runtime))); runtime } diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 9bccf8b6c..5148f1625 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -584,8 +584,28 @@ async fn start_thread_seeds_extension_data_for_mcp_and_lifecycle_contributors() }) .await .expect("start second thread"); - let first_resolved = first_thread.thread.runtime_mcp_config(&config).await; - let second_resolved = second_thread.thread.runtime_mcp_config(&config).await; + let first_session = &first_thread.thread.codex.session; + let first_resolved = first_session + .services + .mcp_manager + .runtime_config_for_step( + &config, + &first_session.services.mcp_thread_init, + &first_session.services.thread_extension_data, + /*available_environment_ids*/ &[], + ) + .await; + let second_session = &second_thread.thread.codex.session; + let second_resolved = second_session + .services + .mcp_manager + .runtime_config_for_step( + &config, + &second_session.services.mcp_thread_init, + &second_session.services.thread_extension_data, + /*available_environment_ids*/ &[], + ) + .await; assert_eq!( *lifecycle_observed diff --git a/codex-rs/ext/extension-api/src/contributors/mcp.rs b/codex-rs/ext/extension-api/src/contributors/mcp.rs index 91e9cb459..499657f55 100644 --- a/codex-rs/ext/extension-api/src/contributors/mcp.rs +++ b/codex-rs/ext/extension-api/src/contributors/mcp.rs @@ -1,17 +1,22 @@ use codex_config::McpServerConfig; +use crate::ExtensionData; use crate::ExtensionDataInit; /// Input supplied while resolving MCP server contributions. /// -/// Thread-scoped implementations can read the immutable host-seeded inputs -/// through [`Self::thread_init`]. Implementations should not retain borrowed -/// context after contribution completes. +/// Thread-scoped implementations can read stable host inputs through [`Self::thread_init`] and +/// keep their cache in [`Self::thread_store`]. Implementations should not retain borrowed context +/// after contribution completes. pub struct McpServerContributionContext<'a, C> { /// Host configuration visible during MCP resolution. config: &'a C, - /// Initial inputs for the active thread, when resolution is thread-scoped. + /// Extension-owned data for the active thread, when resolution is thread-scoped. + thread_store: Option<&'a ExtensionData>, + /// Stable host inputs for the active thread, when resolution is thread-scoped. thread_init: Option<&'a ExtensionDataInit>, + /// Environment IDs whose selected roots may contribute to this exact step. + available_environment_ids: Option<&'a [String]>, } impl Clone for McpServerContributionContext<'_, C> { @@ -27,15 +32,24 @@ impl<'a, C> McpServerContributionContext<'a, C> { pub fn global(config: &'a C) -> Self { Self { config, + thread_store: None, thread_init: None, + available_environment_ids: None, } } - /// Creates context for one active thread runtime. - pub fn for_thread(config: &'a C, thread_init: &'a ExtensionDataInit) -> Self { + /// Creates context for one model step using only currently available environments. + pub fn for_step( + config: &'a C, + thread_init: &'a ExtensionDataInit, + thread_store: &'a ExtensionData, + available_environment_ids: &'a [String], + ) -> Self { Self { config, + thread_store: Some(thread_store), thread_init: Some(thread_init), + available_environment_ids: Some(available_environment_ids), } } @@ -44,10 +58,23 @@ impl<'a, C> McpServerContributionContext<'a, C> { self.config } - /// Returns the frozen initial inputs when resolving for a running thread. + /// Returns extension-owned state when resolving for a running thread. + pub fn thread_store(&self) -> Option<&'a ExtensionData> { + self.thread_store + } + + /// Returns stable host inputs when resolving for a running thread. pub fn thread_init(&self) -> Option<&'a ExtensionDataInit> { self.thread_init } + + /// Returns the exact environment availability projection for a model step. + /// + /// `Some` means contributors must omit selected roots whose environment ID is absent from the + /// slice. Global resolution returns `None` because it has no thread environments. + pub fn available_environment_ids(&self) -> Option<&'a [String]> { + self.available_environment_ids + } } /// One extension-owned overlay for the runtime MCP server configuration. @@ -66,6 +93,12 @@ pub enum McpServerContribution { selection_order: usize, config: Box, }, + /// Adds connector IDs declared by a plugin selected for this thread. + SelectedPluginConnectors { + plugin_id: String, + plugin_display_name: String, + connector_ids: Vec, + }, /// Removes a named MCP server. Remove { name: String }, } diff --git a/codex-rs/ext/mcp/Cargo.toml b/codex-rs/ext/mcp/Cargo.toml index b98c0905d..d918bd5bd 100644 --- a/codex-rs/ext/mcp/Cargo.toml +++ b/codex-rs/ext/mcp/Cargo.toml @@ -16,6 +16,7 @@ workspace = true codex-core = { workspace = true } codex-core-plugins = { workspace = true } codex-config = { workspace = true } +codex-connectors-extension = { workspace = true } codex-exec-server = { workspace = true } codex-extension-api = { workspace = true } codex-features = { workspace = true } diff --git a/codex-rs/ext/mcp/src/executor_plugin.rs b/codex-rs/ext/mcp/src/executor_plugin.rs index 69d1a47b3..32637b4f4 100644 --- a/codex-rs/ext/mcp/src/executor_plugin.rs +++ b/codex-rs/ext/mcp/src/executor_plugin.rs @@ -1,44 +1,47 @@ +use codex_connectors_extension::ExecutorPluginConnectorProvider; use codex_core::config::Config; use codex_core_plugins::ExecutorPluginProvider; use codex_exec_server::EnvironmentManager; -use codex_extension_api::ExtensionDataInit; use codex_extension_api::ExtensionFuture; use codex_extension_api::McpServerContribution; use codex_extension_api::McpServerContributionContext; use codex_extension_api::McpServerContributor; +use codex_protocol::capabilities::CapabilityRootLocation; use codex_protocol::capabilities::SelectedCapabilityRoot; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::OnceCell; +use std::sync::Mutex; use self::provider::ExecutorPluginMcpProvider; mod provider; -/// Frozen MCP declarations for one selected package. +/// Frozen MCP and connector declarations for one selected package. /// /// Each server config retains the stable logical environment ID. Reconnection may replace the /// concrete environment instance without changing that authority. #[derive(Clone)] -struct SelectedPluginMcpServers { +struct SelectedPluginMetadata { plugin_id: String, plugin_display_name: String, - selection_order: usize, servers: Vec<(String, codex_config::McpServerConfig)>, + connector_ids: Vec, } #[derive(Default)] pub(crate) struct SelectedExecutorPluginMcpState { - snapshot: OnceCell>, + cache: Mutex>, } -pub(crate) fn seed_thread_state(thread_init: &mut ExtensionDataInit) { - thread_init.insert(SelectedExecutorPluginMcpState::default()); +struct CachedSelectedRoot { + root: SelectedCapabilityRoot, + metadata: Option, } pub(crate) struct SelectedExecutorPluginMcpContributor { plugin_provider: ExecutorPluginProvider, mcp_provider: ExecutorPluginMcpProvider, + connector_provider: ExecutorPluginConnectorProvider, } impl SelectedExecutorPluginMcpContributor { @@ -46,46 +49,87 @@ impl SelectedExecutorPluginMcpContributor { Self { plugin_provider: ExecutorPluginProvider::new(Arc::clone(&environment_manager)), mcp_provider: ExecutorPluginMcpProvider, + connector_provider: ExecutorPluginConnectorProvider, } } - async fn resolve_snapshot( + /// Returns metadata for one stable selected root. + /// + /// Successful resolution, including a root that is not a plugin or declares no capabilities, + /// is cached until the thread state is dropped. Environment availability never invalidates + /// this cache; it only controls whether the cached metadata is projected into a model step. + async fn metadata_for_root( &self, - selected_roots: &[SelectedCapabilityRoot], - ) -> Vec { - let mut snapshot = Vec::new(); + state: &SelectedExecutorPluginMcpState, + selected_root: &SelectedCapabilityRoot, + ) -> Option { + if let Some(cached) = state + .cache + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .iter() + .find(|cached| cached.root == *selected_root) + { + return cached.metadata.clone(); + } - for (selection_order, selected_root) in selected_roots.iter().enumerate() { - let plugin = match self.plugin_provider.resolve_bound(selected_root).await { - Ok(Some(plugin)) => plugin, - Ok(None) => continue, - Err(err) => { - tracing::warn!( - selected_root = selected_root.id, - error = %err, - "failed to resolve selected executor plugin for MCP discovery" - ); - continue; - } - }; - match self.mcp_provider.load(&plugin).await { - Ok(servers) => snapshot.push(SelectedPluginMcpServers { - plugin_id: plugin.plugin().selected_root_id().to_string(), - plugin_display_name: plugin.plugin().manifest().display_name().to_string(), - selection_order, - servers, - }), - Err(err) => { + let plugin = match self.plugin_provider.resolve_bound(selected_root).await { + Ok(plugin) => plugin, + Err(err) => { + tracing::warn!( + selected_root = selected_root.id, + error = %err, + "failed to resolve selected executor plugin" + ); + return None; + } + }; + let metadata = match plugin { + Some(plugin) => { + let servers = self.mcp_provider.load(&plugin).await.unwrap_or_else(|err| { tracing::warn!( selected_root = selected_root.id, error = %err, "failed to load selected executor plugin MCP servers" ); - } + Vec::new() + }); + let connector_ids = self + .connector_provider + .load(&plugin) + .await + .unwrap_or_else(|err| { + tracing::warn!( + selected_root = selected_root.id, + error = %err, + "failed to load selected executor plugin connectors" + ); + Vec::new() + }) + .into_iter() + .map(|declaration| declaration.connector_id.0) + .collect(); + Some(SelectedPluginMetadata { + plugin_id: plugin.plugin().selected_root_id().to_string(), + plugin_display_name: plugin.plugin().manifest().display_name().to_string(), + servers, + connector_ids, + }) } + None => None, + }; + let mut cache = state + .cache + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if let Some(cached) = cache.iter().find(|cached| cached.root == *selected_root) { + return cached.metadata.clone(); } - - snapshot + cache.push(CachedSelectedRoot { + root: selected_root.clone(), + metadata: metadata.clone(), + }); + metadata } } @@ -102,20 +146,31 @@ impl McpServerContributor for SelectedExecutorPluginMcpContributor { let Some(thread_init) = context.thread_init() else { return Vec::new(); }; + let Some(thread_store) = context.thread_store() else { + return Vec::new(); + }; let Some(selected_roots) = thread_init.get::>() else { return Vec::new(); }; - let Some(state) = thread_init.get::() else { - tracing::warn!("selected executor plugin MCP state was not initialized"); - return Vec::new(); - }; - let snapshot = state - .snapshot - .get_or_init(|| self.resolve_snapshot(selected_roots.as_ref())) - .await; + let state = thread_store.get_or_init(SelectedExecutorPluginMcpState::default); let mut contributions = Vec::new(); - for plugin in snapshot { + for (selection_order, selected_root) in selected_roots.iter().enumerate() { + let CapabilityRootLocation::Environment { environment_id, .. } = + &selected_root.location; + if context + .available_environment_ids() + .is_some_and(|available| { + !available + .iter() + .any(|available| available == environment_id) + }) + { + continue; + } + let Some(plugin) = self.metadata_for_root(&state, selected_root).await else { + continue; + }; let mut servers = plugin.servers.iter().cloned().collect::>(); context .config() @@ -127,10 +182,17 @@ impl McpServerContributor for SelectedExecutorPluginMcpContributor { name, plugin_id: plugin.plugin_id.clone(), plugin_display_name: plugin.plugin_display_name.clone(), - selection_order: plugin.selection_order, + selection_order, config: Box::new(config), } })); + if !plugin.connector_ids.is_empty() { + contributions.push(McpServerContribution::SelectedPluginConnectors { + plugin_id: plugin.plugin_id, + plugin_display_name: plugin.plugin_display_name, + connector_ids: plugin.connector_ids, + }); + } } contributions diff --git a/codex-rs/ext/mcp/src/lib.rs b/codex-rs/ext/mcp/src/lib.rs index e32316ac3..9c22bad5a 100644 --- a/codex-rs/ext/mcp/src/lib.rs +++ b/codex-rs/ext/mcp/src/lib.rs @@ -51,10 +51,3 @@ pub fn install_executor_plugins( executor_plugin::SelectedExecutorPluginMcpContributor::new(environment_manager), )); } - -/// Seeds the per-thread snapshot used by selected executor plugin MCP discovery. -pub fn initialize_executor_plugin_thread_data( - thread_init: &mut codex_extension_api::ExtensionDataInit, -) { - executor_plugin::seed_thread_state(thread_init); -} diff --git a/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs b/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs index 7bdbc83d6..65a4248a3 100644 --- a/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs +++ b/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs @@ -3,6 +3,7 @@ use codex_core::config::Config; use codex_core::config::ConfigBuilder; use codex_exec_server::EnvironmentManager; use codex_exec_server::LOCAL_ENVIRONMENT_ID; +use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionDataInit; use codex_extension_api::ExtensionRegistryBuilder; use codex_extension_api::McpServerContribution; @@ -109,12 +110,15 @@ async fn selected_plugin_contributions( path: PathUri::from_host_native_path(plugin_root)?, }, }]); - codex_mcp_extension::initialize_executor_plugin_thread_data(&mut thread_init); + let thread_store = ExtensionData::new_with_init("test-thread", thread_init.clone()); + let available_environment_ids = vec![LOCAL_ENVIRONMENT_ID.to_string()]; Ok(registry.mcp_server_contributors()[0] - .contribute(McpServerContributionContext::for_thread( + .contribute(McpServerContributionContext::for_step( config, &thread_init, + &thread_store, + &available_environment_ids, )) .await .into_iter() @@ -132,7 +136,9 @@ async fn selected_plugin_contributions( selection_order, enabled: config.enabled, }, - McpServerContribution::Set { .. } | McpServerContribution::Remove { .. } => { + McpServerContribution::Set { .. } + | McpServerContribution::SelectedPluginConnectors { .. } + | McpServerContribution::Remove { .. } => { panic!("expected selected plugin contribution") } })