From 41b4fabbb4fb2a5d9d956d12c706daee992a76e2 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Wed, 10 Jun 2026 08:33:21 -0700 Subject: [PATCH] Use latest-wins MCP manager replacement (#27259) ## Summary We originally addressed startup prewarming holding the read side of `RwLock` by snapshotting tool-list state. Review feedback identified the broader ownership problem: the outer synchronization should only publish or retrieve the current manager, while MCP operations rely on the manager's internal synchronization. A follow-up preserved operation retirement with a separate gate, but further review questioned whether that synchronization was actually required and whether we could support latest-wins replacement instead. This PR now stores the current MCP manager in `ArcSwap`. Each operation uses `load_full()` to obtain an owned `Arc`, then performs MCP I/O without retaining the publication mechanism. Refresh cancels obsolete startup work, constructs a replacement, and atomically publishes it. New operations see the latest manager, while operations that already loaded the previous manager retain a valid handle. Refresh happens at a turn boundary, so there should be no active user tool calls to drain. Git history supports dropping the outer `RwLock`. It was introduced in `03ffe4d595` on November 17, 2025 for non-blocking MCP startup: the session published an empty manager, startup initialized that same object while holding the write lock, and readers waited for initialization. `7cd2e84026` on February 19, 2026 removed that two-phase initialization in favor of constructing a fresh manager and swapping it in, explicitly noting that `Option` or `OnceCell` could replace the placeholder design. Hot reload later reused the existing lock to publish a replacement, but I found no indication that the lock was introduced to guarantee in-flight tool calls finish before refresh or shutdown. Terminal shutdown remains separate from refresh: it aborts startup prewarming and active tasks before shutting down the current manager, so tool calls may be interrupted and no model WebSocket work continues after shutdown. Focused regression coverage exercises pending tool-list cancellation, deferred refresh, and startup-prewarm shutdown. --- codex-rs/codex-mcp/src/connection_manager.rs | 20 ++------ .../codex-mcp/src/connection_manager_tests.rs | 42 +++++++++++++++++ codex-rs/core/src/codex_delegate_tests.rs | 40 ++-------------- codex-rs/core/src/connectors.rs | 2 +- codex-rs/core/src/mcp_tool_call.rs | 35 +++----------- codex-rs/core/src/mcp_tool_call_tests.rs | 5 +- codex-rs/core/src/session/handlers.rs | 13 ++++-- codex-rs/core/src/session/mcp.rs | 44 ++++-------------- codex-rs/core/src/session/mod.rs | 8 +--- codex-rs/core/src/session/session.rs | 4 +- codex-rs/core/src/session/tests.rs | 18 +++----- codex-rs/core/src/session/turn.rs | 19 +------- codex-rs/core/src/session/turn_context.rs | 2 +- codex-rs/core/src/session_startup_prewarm.rs | 10 +++- codex-rs/core/src/state/service.rs | 13 ++---- .../list_mcp_resource_templates.rs | 7 +-- .../mcp_resource/list_mcp_resources.rs | 7 +-- .../tools/handlers/request_plugin_install.rs | 6 +-- codex-rs/core/src/tools/router_tests.rs | 7 +-- codex-rs/core/tests/suite/rmcp_client.rs | 46 +++++++++++++++++++ 20 files changed, 156 insertions(+), 192 deletions(-) diff --git a/codex-rs/codex-mcp/src/connection_manager.rs b/codex-rs/codex-mcp/src/connection_manager.rs index 5b7b6b3e8..659d35718 100644 --- a/codex-rs/codex-mcp/src/connection_manager.rs +++ b/codex-rs/codex-mcp/src/connection_manager.rs @@ -348,22 +348,12 @@ impl McpConnectionManager { !self.clients.is_empty() } - /// Drain all MCP clients from this manager and return a future that stops - /// them and terminates their stdio server processes. - pub fn begin_shutdown(&mut self) -> impl std::future::Future + Send + 'static { - self.startup_cancellation_token.cancel(); - let clients = std::mem::take(&mut self.clients); - self.server_metadata.clear(); - async move { - for client in clients.into_values() { - client.shutdown().await; - } - } - } - /// Stop all MCP clients owned by this manager and terminate stdio server processes. - pub async fn shutdown(&mut self) { - self.begin_shutdown().await; + pub async fn shutdown(&self) { + self.startup_cancellation_token.cancel(); + for client in self.clients.values() { + client.shutdown().await; + } } pub fn server_origin(&self, server_name: &str) -> Option<&str> { diff --git a/codex-rs/codex-mcp/src/connection_manager_tests.rs b/codex-rs/codex-mcp/src/connection_manager_tests.rs index 4b90337ac..ea405ced9 100644 --- a/codex-rs/codex-mcp/src/connection_manager_tests.rs +++ b/codex-rs/codex-mcp/src/connection_manager_tests.rs @@ -980,6 +980,48 @@ async fn list_all_tools_blocks_while_client_is_pending_without_cached_tool_info_ assert!(timeout_result.is_err()); } +#[tokio::test] +async fn shutdown_cancels_pending_tool_listing() { + let cancel_token = CancellationToken::new(); + let cancel_token_for_startup = cancel_token.clone(); + let (started_tx, started_rx) = tokio::sync::oneshot::channel(); + let pending_client = async move { + let _ = started_tx.send(()); + cancel_token_for_startup.cancelled().await; + Err(StartupOutcomeError::Cancelled) + } + .boxed() + .shared(); + let approval_policy = Constrained::allow_any(AskForApproval::OnFailure); + let permission_profile = Constrained::allow_any(PermissionProfile::default()); + let mut manager = McpConnectionManager::new_uninitialized( + &approval_policy, + &permission_profile, + /*prefix_mcp_tool_names*/ true, + ); + manager.clients.insert( + CODEX_APPS_MCP_SERVER_NAME.to_string(), + AsyncManagedClient { + client: pending_client, + cached_tool_info_snapshot: None, + cached_server_info: None, + startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)), + tool_plugin_provenance: Arc::new(ToolPluginProvenance::default()), + cancel_token, + }, + ); + let manager = Arc::new(manager); + let manager_for_list = Arc::clone(&manager); + let list_task = tokio::spawn(async move { manager_for_list.list_all_tools().await }); + + started_rx.await.expect("tool listing should start"); + tokio::time::timeout(Duration::from_secs(1), manager.shutdown()) + .await + .expect("shutdown should cancel speculative tool listing"); + let tools = list_task.await.expect("tool listing task should not panic"); + assert!(tools.is_empty()); +} + #[tokio::test] async fn list_all_tools_does_not_block_when_cached_tool_info_snapshot_is_empty() { let pending_client = futures::future::pending::>() diff --git a/codex-rs/core/src/codex_delegate_tests.rs b/codex-rs/core/src/codex_delegate_tests.rs index 7dabb09d3..cb7767a16 100644 --- a/codex-rs/core/src/codex_delegate_tests.rs +++ b/codex-rs/core/src/codex_delegate_tests.rs @@ -452,7 +452,7 @@ async fn delegated_mcp_guardian_abort_returns_synthetic_decline_answer() { } #[tokio::test] -async fn delegated_mcp_user_reviewer_waits_for_metadata_lookup() { +async fn delegated_mcp_user_reviewer_returns_none_without_metadata() { let (parent_session, parent_ctx, _rx_events) = crate::session::tests::make_session_and_context_with_rx().await; let pending_mcp_invocations = Arc::new(Mutex::new(HashMap::from([( @@ -464,21 +464,6 @@ async fn delegated_mcp_user_reviewer_waits_for_metadata_lookup() { }, )]))); let cancel_token = CancellationToken::new(); - let manager = Arc::clone(&parent_session.services.mcp_connection_manager); - let (manager_locked_tx, manager_locked_rx) = std::sync::mpsc::sync_channel(0); - let (release_manager_tx, release_manager_rx) = std::sync::mpsc::sync_channel(0); - let manager_lock = tokio::task::spawn_blocking(move || { - let _manager_guard = manager.blocking_write(); - manager_locked_tx - .send(()) - .expect("manager lock receiver should remain open"); - release_manager_rx - .recv() - .expect("manager lock release sender should remain open"); - }); - manager_locked_rx - .recv_timeout(Duration::from_secs(1)) - .expect("manager write lock should be acquired"); let event = RequestUserInputEvent { call_id: "call-1".to_string(), @@ -498,24 +483,7 @@ async fn delegated_mcp_user_reviewer_waits_for_metadata_lookup() { &pending_mcp_invocations, &event, &cancel_token, - ); - tokio::pin!(response); - assert!( - timeout(Duration::from_millis(100), &mut response) - .await - .is_err(), - "manual reviewer should wait for MCP metadata" - ); - release_manager_tx - .send(()) - .expect("manager lock holder should remain open"); - manager_lock - .await - .expect("manager lock task should not panic"); - assert_eq!( - timeout(Duration::from_secs(1), response) - .await - .expect("manual reviewer should finish after MCP metadata lookup"), - None - ); + ) + .await; + assert_eq!(response, None); } diff --git a/codex-rs/core/src/connectors.rs b/codex-rs/core/src/connectors.rs index 6d6ca52d8..d8df72855 100644 --- a/codex-rs/core/src/connectors.rs +++ b/codex-rs/core/src/connectors.rs @@ -287,7 +287,7 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_mcp_manager( drop(rx_event); let cancel_token = CancellationToken::new(); - let mut mcp_connection_manager = McpConnectionManager::new( + let mcp_connection_manager = McpConnectionManager::new( &mcp_servers, config.mcp_oauth_credentials_store_mode, auth_status_entries, diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index b9a8d4ff0..e5273a666 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -319,8 +319,7 @@ async fn handle_approved_mcp_tool_call( let server_origin = sess .services .mcp_connection_manager - .read() - .await + .load_full() .server_origin(&server) .map(str::to_string); @@ -606,8 +605,7 @@ async fn maybe_request_codex_apps_auth_elicitation( if !sess .services .mcp_connection_manager - .read() - .await + .load_full() .is_host_owned_codex_apps_server(server) { return result; @@ -669,13 +667,9 @@ async fn maybe_request_codex_apps_auth_elicitation( auth_elicitation_completed_result(&plan.auth_failure, result.meta) } -#[expect( - clippy::await_holding_invalid_type, - reason = "Codex Apps cache refresh reads through the session-owned manager guard" -)] async fn refresh_codex_apps_after_connector_auth(sess: &Session, turn_context: &TurnContext) { let mcp_tools_result = { - let manager = sess.services.mcp_connection_manager.read().await; + let manager = sess.services.mcp_connection_manager.load_full(); manager.hard_refresh_codex_apps_tools_cache().await }; @@ -694,10 +688,6 @@ async fn refresh_codex_apps_after_connector_auth(sess: &Session, turn_context: & } } -#[expect( - clippy::await_holding_invalid_type, - reason = "MCP sandbox metadata reads through the session-owned manager guard" -)] async fn augment_mcp_tool_request_meta_with_sandbox_state( sess: &Session, turn_context: &TurnContext, @@ -707,8 +697,7 @@ async fn augment_mcp_tool_request_meta_with_sandbox_state( let supports_sandbox_state_meta = sess .services .mcp_connection_manager - .read() - .await + .load_full() .server_supports_sandbox_state_meta_capability(server) .await .unwrap_or(false); @@ -757,8 +746,7 @@ async fn maybe_mark_thread_memory_mode_polluted( let pollutes_memory = sess .services .mcp_connection_manager - .read() - .await + .load_full() .server_pollutes_memory(server); if !pollutes_memory { return; @@ -1414,17 +1402,13 @@ async fn mcp_tool_approval_decision_from_guardian( } } -#[expect( - clippy::await_holding_invalid_type, - reason = "MCP approval metadata reads through the session-owned manager guard" -)] pub(crate) async fn lookup_mcp_tool_metadata( sess: &Session, turn_context: &TurnContext, server: &str, tool_name: &str, ) -> Option { - let manager = sess.services.mcp_connection_manager.read().await; + let manager = sess.services.mcp_connection_manager.load_full(); let plugin_id = manager .plugin_id_for_mcp_server_name(server) .map(str::to_string); @@ -1509,10 +1493,6 @@ fn get_mcp_app_resource_uri( }) } -#[expect( - clippy::await_holding_invalid_type, - reason = "MCP app metadata reads through the session-owned manager guard" -)] async fn lookup_mcp_app_usage_metadata( sess: &Session, server: &str, @@ -1521,8 +1501,7 @@ async fn lookup_mcp_app_usage_metadata( let tools = sess .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_tools() .await; diff --git a/codex-rs/core/src/mcp_tool_call_tests.rs b/codex-rs/core/src/mcp_tool_call_tests.rs index da54fe187..723948b7b 100644 --- a/codex-rs/core/src/mcp_tool_call_tests.rs +++ b/codex-rs/core/src/mcp_tool_call_tests.rs @@ -1280,7 +1280,10 @@ async fn install_host_owned_codex_apps_manager(session: &Session, turn_context: /*elicitation_reviewer*/ None, ) .await; - *session.services.mcp_connection_manager.write().await = manager; + session + .services + .mcp_connection_manager + .store(Arc::new(manager)); } #[tokio::test] diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index 10b0b27cc..bfb87a0ff 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -580,6 +580,9 @@ pub async fn set_thread_memory_mode(sess: &Arc, sub_id: String, mode: T } async fn shutdown_session_runtime(sess: &Arc) { + if let Some(startup_prewarm) = sess.take_session_startup_prewarm().await { + startup_prewarm.abort().await; + } sess.abort_all_tasks(TurnAbortReason::Interrupted).await; let _ = sess.conversation.shutdown().await; sess.services @@ -589,11 +592,11 @@ async fn shutdown_session_runtime(sess: &Arc) { if let Err(err) = sess.services.code_mode_service.shutdown().await { warn!("failed to shutdown code mode session: {err}"); } - let mcp_shutdown = { - let mut manager = sess.services.mcp_connection_manager.write().await; - manager.begin_shutdown() - }; - mcp_shutdown.await; + sess.services + .mcp_connection_manager + .load_full() + .shutdown() + .await; sess.guardian_review_session.shutdown().await; } diff --git a/codex-rs/core/src/session/mcp.rs b/codex-rs/core/src/session/mcp.rs index 779813641..39333801c 100644 --- a/codex-rs/core/src/session/mcp.rs +++ b/codex-rs/core/src/session/mcp.rs @@ -91,8 +91,7 @@ impl Session { if self .services .mcp_connection_manager - .read() - .await + .load_full() .elicitations_auto_deny() { return McpServerElicitationOutcome { @@ -226,16 +225,11 @@ impl Session { self.services .mcp_connection_manager - .read() - .await + .load_full() .resolve_elicitation(server_name, id, response) .await } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP resource calls are serialized through the session-owned manager guard" - )] pub async fn list_resources( &self, server: &str, @@ -243,16 +237,11 @@ impl Session { ) -> anyhow::Result { self.services .mcp_connection_manager - .read() - .await + .load_full() .list_resources(server, params) .await } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP resource calls are serialized through the session-owned manager guard" - )] pub async fn list_resource_templates( &self, server: &str, @@ -260,16 +249,11 @@ impl Session { ) -> anyhow::Result { self.services .mcp_connection_manager - .read() - .await + .load_full() .list_resource_templates(server, params) .await } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP resource calls are serialized through the session-owned manager guard" - )] pub async fn read_resource( &self, server: &str, @@ -277,16 +261,11 @@ impl Session { ) -> anyhow::Result { self.services .mcp_connection_manager - .read() - .await + .load_full() .read_resource(server, params) .await } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP tool calls are serialized through the session-owned manager guard" - )] pub async fn call_tool( &self, server: &str, @@ -296,8 +275,7 @@ impl Session { ) -> anyhow::Result { self.services .mcp_connection_manager - .read() - .await + .load_full() .call_tool(server, tool, arguments, meta) .await } @@ -366,14 +344,12 @@ impl Session { ) .await; { - let current_manager = self.services.mcp_connection_manager.read().await; + let current_manager = self.services.mcp_connection_manager.load_full(); refreshed_manager.set_elicitations_auto_deny(current_manager.elicitations_auto_deny()); } - let mut old_manager = { - let mut manager = self.services.mcp_connection_manager.write().await; - std::mem::replace(&mut *manager, refreshed_manager) - }; - old_manager.shutdown().await; + self.services + .mcp_connection_manager + .store(Arc::new(refreshed_manager)); } pub(crate) async fn refresh_mcp_servers_if_requested( diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 2e5d5643d..7cc091e43 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -793,7 +793,7 @@ impl Codex { ..Default::default() }) .await?; - let mcp_connection_manager = self.session.services.mcp_connection_manager.read().await; + let mcp_connection_manager = self.session.services.mcp_connection_manager.load_full(); mcp_connection_manager.set_elicitations_auto_deny(mcp_elicitations_auto_deny); Ok(()) } @@ -2748,10 +2748,6 @@ impl Session { } } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP app context rendering reads through the session-owned manager guard" - )] pub(crate) async fn build_initial_context( &self, turn_context: &TurnContext, @@ -2844,7 +2840,7 @@ impl Session { } } if turn_context.config.include_apps_instructions && turn_context.apps_enabled() { - let mcp_connection_manager = self.services.mcp_connection_manager.read().await; + let mcp_connection_manager = self.services.mcp_connection_manager.load_full(); let accessible_and_enabled_connectors = connectors::list_accessible_and_enabled_connectors_from_manager( &mcp_connection_manager, diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 98d19957c..655454a32 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -981,13 +981,13 @@ impl Session { // before any MCP-related events. It is reasonable to consider // changing this to use Option or OnceCell, though the current // setup is straightforward enough and performs well. - mcp_connection_manager: Arc::new(RwLock::new( + mcp_connection_manager: arc_swap::ArcSwap::from_pointee( McpConnectionManager::new_uninitialized_with_permission_profile( &config.permissions.approval_policy, config.permissions.permission_profile(), config.prefix_mcp_tool_names(), ), - )), + ), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index b83217293..8d8208205 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -319,8 +319,7 @@ async fn request_mcp_server_elicitation_auto_accepts_when_auto_deny_is_enabled() session .services .mcp_connection_manager - .read() - .await + .load_full() .set_elicitations_auto_deny(/*auto_deny*/ true); let requested_schema: McpElicitationSchema = serde_json::from_value(json!({ @@ -4816,13 +4815,13 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { ); let services = SessionServices { - mcp_connection_manager: Arc::new(RwLock::new( + mcp_connection_manager: arc_swap::ArcSwap::from_pointee( McpConnectionManager::new_uninitialized_with_permission_profile( &config.permissions.approval_policy, config.permissions.permission_profile(), config.prefix_mcp_tool_names(), ), - )), + ), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -6894,13 +6893,13 @@ where ); let services = SessionServices { - mcp_connection_manager: Arc::new(RwLock::new( + mcp_connection_manager: arc_swap::ArcSwap::from_pointee( McpConnectionManager::new_uninitialized_with_permission_profile( &config.permissions.approval_policy, config.permissions.permission_profile(), config.prefix_mcp_tool_names(), ), - )), + ), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -9440,18 +9439,13 @@ async fn abort_review_task_emits_exited_then_aborted_and_records_history() { } #[tokio::test] -#[expect( - clippy::await_holding_invalid_type, - reason = "test builds a router from session-owned MCP manager state" -)] async fn fatal_tool_error_stops_turn_and_reports_error() { let (session, turn_context, _rx) = make_session_and_context_with_rx().await; let tools = { session .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_tools() .await }; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 95d24dc03..722957d30 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -432,10 +432,6 @@ async fn run_hooks_and_record_inputs( blocked_input && !accepted_user_input } -#[expect( - clippy::await_holding_invalid_type, - reason = "MCP tool listing borrows the read guard across cancellation-aware await" -)] #[instrument(level = "trace", skip_all)] async fn build_skills_and_plugins( sess: &Arc, @@ -473,8 +469,7 @@ async fn build_skills_and_plugins( match sess .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_tools() .or_cancel(cancellation_token) .await @@ -1078,10 +1073,6 @@ async fn run_sampling_request( } } -#[expect( - clippy::await_holding_invalid_type, - reason = "tool router construction reads through the session-owned manager guard" -)] #[instrument(level = "trace", skip_all, fields( @@ -1095,18 +1086,12 @@ pub(crate) async fn built_tools( turn_context: &TurnContext, cancellation_token: &CancellationToken, ) -> CodexResult> { - let mcp_connection_manager = sess - .services - .mcp_connection_manager - .read() - .instrument(trace_span!("read_mcp_connection_manager")) - .await; + let mcp_connection_manager = sess.services.mcp_connection_manager.load_full(); let has_mcp_servers = mcp_connection_manager.has_servers(); let all_mcp_tools = mcp_connection_manager .list_all_tools() .or_cancel(cancellation_token) .await?; - drop(mcp_connection_manager); let loaded_plugins = sess .services .plugins_manager diff --git a/codex-rs/core/src/session/turn_context.rs b/codex-rs/core/src/session/turn_context.rs index e3bd01081..10eff3f3b 100644 --- a/codex-rs/core/src/session/turn_context.rs +++ b/codex-rs/core/src/session/turn_context.rs @@ -724,7 +724,7 @@ impl Session { .unwrap_or_else(|| session_configuration.cwd().clone()); let per_turn_config = Self::build_per_turn_config(&session_configuration, cwd.clone()); { - let mcp_connection_manager = self.services.mcp_connection_manager.read().await; + let mcp_connection_manager = self.services.mcp_connection_manager.load_full(); mcp_connection_manager.set_approval_policy(&session_configuration.approval_policy); mcp_connection_manager .set_permission_profile(session_configuration.permission_profile()); diff --git a/codex-rs/core/src/session_startup_prewarm.rs b/codex-rs/core/src/session_startup_prewarm.rs index ba2cc42f2..f4948ae2f 100644 --- a/codex-rs/core/src/session_startup_prewarm.rs +++ b/codex-rs/core/src/session_startup_prewarm.rs @@ -4,6 +4,7 @@ use std::time::Instant; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use tokio_util::task::AbortOnDropHandle; use tracing::info; use tracing::warn; @@ -19,7 +20,7 @@ use codex_protocol::error::Result as CodexResult; use codex_protocol::models::BaseInstructions; pub(crate) struct SessionStartupPrewarmHandle { - task: JoinHandle>, + task: AbortOnDropHandle>, started_at: Instant, timeout: Duration, } @@ -40,12 +41,17 @@ impl SessionStartupPrewarmHandle { timeout: Duration, ) -> Self { Self { - task, + task: AbortOnDropHandle::new(task), started_at, timeout, } } + pub(crate) async fn abort(self) { + self.task.abort(); + let _ = self.task.await; + } + async fn resolve( self, session_telemetry: &SessionTelemetry, diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 9bc4d7391..b4c499118 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -35,12 +35,12 @@ use codex_thread_store::ThreadStore; use std::path::PathBuf; use tokio::runtime::Handle; use tokio::sync::Mutex; -use tokio::sync::RwLock; use tokio::sync::watch; use tokio_util::sync::CancellationToken; pub(crate) struct SessionServices { - pub(crate) mcp_connection_manager: Arc>, + /// The latest manager; callers retain an owned handle while performing MCP I/O. + pub(crate) mcp_connection_manager: ArcSwap, pub(crate) mcp_startup_cancellation_token: Mutex, pub(crate) unified_exec_manager: UnifiedExecProcessManager, #[cfg_attr(not(unix), allow(dead_code))] @@ -87,18 +87,13 @@ pub(crate) struct SessionServices { impl SessionServices { /// Installs the manager before validating required servers so startup-time elicitation can /// resolve through the session's manager while validation waits. - #[expect( - clippy::await_holding_invalid_type, - reason = "required MCP validation keeps the installed manager reachable for startup-time elicitation" - )] pub(crate) async fn install_mcp_connection_manager( &self, manager: McpConnectionManager, ) -> Result<()> { - *self.mcp_connection_manager.write().await = manager; + self.mcp_connection_manager.store(Arc::new(manager)); self.mcp_connection_manager - .read() - .await + .load_full() .validate_required_servers() .await } diff --git a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs index c46df13b1..e96aad755 100644 --- a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs +++ b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs @@ -40,10 +40,6 @@ impl ToolExecutor for ListMcpResourceTemplatesHandler { true } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP resource template listing reads through the session-owned manager guard" - )] async fn handle( &self, invocation: ToolInvocation, @@ -107,8 +103,7 @@ impl ToolExecutor for ListMcpResourceTemplatesHandler { let templates = session .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_resource_templates() .await; Ok(ListResourceTemplatesPayload::from_all_servers(templates)) diff --git a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs index f7fcad342..1b052184e 100644 --- a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs +++ b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs @@ -40,10 +40,6 @@ impl ToolExecutor for ListMcpResourcesHandler { true } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP resource listing reads through the session-owned manager guard" - )] async fn handle( &self, invocation: ToolInvocation, @@ -105,8 +101,7 @@ impl ToolExecutor for ListMcpResourcesHandler { let resources = session .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_resources() .await; Ok(ListResourcesPayload::from_all_servers(resources)) diff --git a/codex-rs/core/src/tools/handlers/request_plugin_install.rs b/codex-rs/core/src/tools/handlers/request_plugin_install.rs index 3f7bd92d2..ce5bb3fb1 100644 --- a/codex-rs/core/src/tools/handlers/request_plugin_install.rs +++ b/codex-rs/core/src/tools/handlers/request_plugin_install.rs @@ -304,10 +304,6 @@ fn is_remote_plugin_install_suggestion(plugin_id: &str) -> bool { .is_some_and(|(_, marketplace_name)| marketplace_name == REMOTE_GLOBAL_MARKETPLACE_NAME) } -#[expect( - clippy::await_holding_invalid_type, - reason = "connector cache refresh reads through the session-owned manager guard" -)] async fn refresh_missing_requested_connectors( session: &crate::session::session::Session, turn: &crate::session::turn_context::TurnContext, @@ -319,7 +315,7 @@ async fn refresh_missing_requested_connectors( return Some(Vec::new()); } - let manager = session.services.mcp_connection_manager.read().await; + let manager = session.services.mcp_connection_manager.load_full(); let mcp_tools = manager.list_all_tools().await; let accessible_connectors = connectors::with_app_enabled_state( connectors::accessible_connectors_from_mcp_tools(&mcp_tools), diff --git a/codex-rs/core/src/tools/router_tests.rs b/codex-rs/core/src/tools/router_tests.rs index cece7fc17..7c3e0157b 100644 --- a/codex-rs/core/src/tools/router_tests.rs +++ b/codex-rs/core/src/tools/router_tests.rs @@ -95,17 +95,12 @@ fn extension_tool_test_registry() -> Arc> { } #[tokio::test] -#[expect( - clippy::await_holding_invalid_type, - reason = "test builds a router from session-owned MCP manager state" -)] async fn parallel_support_does_not_match_namespaced_local_tool_names() -> anyhow::Result<()> { let (session, turn) = make_session_and_context().await; let mcp_tools = session .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_tools() .await; let router = ToolRouter::from_turn_context( diff --git a/codex-rs/core/tests/suite/rmcp_client.rs b/codex-rs/core/tests/suite/rmcp_client.rs index 5cb61fa4a..c47aa0970 100644 --- a/codex-rs/core/tests/suite/rmcp_client.rs +++ b/codex-rs/core/tests/suite/rmcp_client.rs @@ -558,6 +558,52 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn shutdown_cancels_startup_prewarm_waiting_for_mcp_startup() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_websocket_server(vec![vec![vec![ + responses::ev_response_created("warm-1"), + responses::ev_completed("warm-1"), + ]]]) + .await; + let pending_mcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; + let pending_mcp_url = format!("http://{}/mcp", pending_mcp_listener.local_addr()?); + + let fixture = test_codex() + .with_config(move |config| { + insert_mcp_server( + config, + "shutdown_prewarm", + McpServerTransportConfig::StreamableHttp { + url: pending_mcp_url, + bearer_token_env_var: None, + http_headers: None, + env_http_headers: None, + }, + TestMcpServerOptions::default(), + ); + }) + .build_with_websocket_server(&server) + .await?; + + let (_pending_mcp_connection, _) = + tokio::time::timeout(Duration::from_secs(5), pending_mcp_listener.accept()) + .await + .context("startup prewarm should start the MCP connection")??; + tokio::time::timeout(Duration::from_secs(2), fixture.codex.shutdown_and_wait()) + .await + .context("shutdown should not wait for startup prewarm MCP startup")??; + tokio::time::sleep(Duration::from_millis(100)).await; + assert!( + server.connections().is_empty(), + "startup prewarm should not send a websocket request after shutdown" + ); + + server.shutdown().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[serial(mcp_cwd)] async fn stdio_server_uses_configured_cwd_before_runtime_fallback() -> anyhow::Result<()> {