diff --git a/codex-rs/codex-mcp/src/connection_manager.rs b/codex-rs/codex-mcp/src/connection_manager.rs index 5b7b6b3e8..659d35718 100644 --- a/codex-rs/codex-mcp/src/connection_manager.rs +++ b/codex-rs/codex-mcp/src/connection_manager.rs @@ -348,22 +348,12 @@ impl McpConnectionManager { !self.clients.is_empty() } - /// Drain all MCP clients from this manager and return a future that stops - /// them and terminates their stdio server processes. - pub fn begin_shutdown(&mut self) -> impl std::future::Future + Send + 'static { - self.startup_cancellation_token.cancel(); - let clients = std::mem::take(&mut self.clients); - self.server_metadata.clear(); - async move { - for client in clients.into_values() { - client.shutdown().await; - } - } - } - /// Stop all MCP clients owned by this manager and terminate stdio server processes. - pub async fn shutdown(&mut self) { - self.begin_shutdown().await; + pub async fn shutdown(&self) { + self.startup_cancellation_token.cancel(); + for client in self.clients.values() { + client.shutdown().await; + } } pub fn server_origin(&self, server_name: &str) -> Option<&str> { diff --git a/codex-rs/codex-mcp/src/connection_manager_tests.rs b/codex-rs/codex-mcp/src/connection_manager_tests.rs index 4b90337ac..ea405ced9 100644 --- a/codex-rs/codex-mcp/src/connection_manager_tests.rs +++ b/codex-rs/codex-mcp/src/connection_manager_tests.rs @@ -980,6 +980,48 @@ async fn list_all_tools_blocks_while_client_is_pending_without_cached_tool_info_ assert!(timeout_result.is_err()); } +#[tokio::test] +async fn shutdown_cancels_pending_tool_listing() { + let cancel_token = CancellationToken::new(); + let cancel_token_for_startup = cancel_token.clone(); + let (started_tx, started_rx) = tokio::sync::oneshot::channel(); + let pending_client = async move { + let _ = started_tx.send(()); + cancel_token_for_startup.cancelled().await; + Err(StartupOutcomeError::Cancelled) + } + .boxed() + .shared(); + let approval_policy = Constrained::allow_any(AskForApproval::OnFailure); + let permission_profile = Constrained::allow_any(PermissionProfile::default()); + let mut manager = McpConnectionManager::new_uninitialized( + &approval_policy, + &permission_profile, + /*prefix_mcp_tool_names*/ true, + ); + manager.clients.insert( + CODEX_APPS_MCP_SERVER_NAME.to_string(), + AsyncManagedClient { + client: pending_client, + cached_tool_info_snapshot: None, + cached_server_info: None, + startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)), + tool_plugin_provenance: Arc::new(ToolPluginProvenance::default()), + cancel_token, + }, + ); + let manager = Arc::new(manager); + let manager_for_list = Arc::clone(&manager); + let list_task = tokio::spawn(async move { manager_for_list.list_all_tools().await }); + + started_rx.await.expect("tool listing should start"); + tokio::time::timeout(Duration::from_secs(1), manager.shutdown()) + .await + .expect("shutdown should cancel speculative tool listing"); + let tools = list_task.await.expect("tool listing task should not panic"); + assert!(tools.is_empty()); +} + #[tokio::test] async fn list_all_tools_does_not_block_when_cached_tool_info_snapshot_is_empty() { let pending_client = futures::future::pending::>() diff --git a/codex-rs/core/src/codex_delegate_tests.rs b/codex-rs/core/src/codex_delegate_tests.rs index 7dabb09d3..cb7767a16 100644 --- a/codex-rs/core/src/codex_delegate_tests.rs +++ b/codex-rs/core/src/codex_delegate_tests.rs @@ -452,7 +452,7 @@ async fn delegated_mcp_guardian_abort_returns_synthetic_decline_answer() { } #[tokio::test] -async fn delegated_mcp_user_reviewer_waits_for_metadata_lookup() { +async fn delegated_mcp_user_reviewer_returns_none_without_metadata() { let (parent_session, parent_ctx, _rx_events) = crate::session::tests::make_session_and_context_with_rx().await; let pending_mcp_invocations = Arc::new(Mutex::new(HashMap::from([( @@ -464,21 +464,6 @@ async fn delegated_mcp_user_reviewer_waits_for_metadata_lookup() { }, )]))); let cancel_token = CancellationToken::new(); - let manager = Arc::clone(&parent_session.services.mcp_connection_manager); - let (manager_locked_tx, manager_locked_rx) = std::sync::mpsc::sync_channel(0); - let (release_manager_tx, release_manager_rx) = std::sync::mpsc::sync_channel(0); - let manager_lock = tokio::task::spawn_blocking(move || { - let _manager_guard = manager.blocking_write(); - manager_locked_tx - .send(()) - .expect("manager lock receiver should remain open"); - release_manager_rx - .recv() - .expect("manager lock release sender should remain open"); - }); - manager_locked_rx - .recv_timeout(Duration::from_secs(1)) - .expect("manager write lock should be acquired"); let event = RequestUserInputEvent { call_id: "call-1".to_string(), @@ -498,24 +483,7 @@ async fn delegated_mcp_user_reviewer_waits_for_metadata_lookup() { &pending_mcp_invocations, &event, &cancel_token, - ); - tokio::pin!(response); - assert!( - timeout(Duration::from_millis(100), &mut response) - .await - .is_err(), - "manual reviewer should wait for MCP metadata" - ); - release_manager_tx - .send(()) - .expect("manager lock holder should remain open"); - manager_lock - .await - .expect("manager lock task should not panic"); - assert_eq!( - timeout(Duration::from_secs(1), response) - .await - .expect("manual reviewer should finish after MCP metadata lookup"), - None - ); + ) + .await; + assert_eq!(response, None); } diff --git a/codex-rs/core/src/connectors.rs b/codex-rs/core/src/connectors.rs index 6d6ca52d8..d8df72855 100644 --- a/codex-rs/core/src/connectors.rs +++ b/codex-rs/core/src/connectors.rs @@ -287,7 +287,7 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_mcp_manager( drop(rx_event); let cancel_token = CancellationToken::new(); - let mut mcp_connection_manager = McpConnectionManager::new( + let mcp_connection_manager = McpConnectionManager::new( &mcp_servers, config.mcp_oauth_credentials_store_mode, auth_status_entries, diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index b9a8d4ff0..e5273a666 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -319,8 +319,7 @@ async fn handle_approved_mcp_tool_call( let server_origin = sess .services .mcp_connection_manager - .read() - .await + .load_full() .server_origin(&server) .map(str::to_string); @@ -606,8 +605,7 @@ async fn maybe_request_codex_apps_auth_elicitation( if !sess .services .mcp_connection_manager - .read() - .await + .load_full() .is_host_owned_codex_apps_server(server) { return result; @@ -669,13 +667,9 @@ async fn maybe_request_codex_apps_auth_elicitation( auth_elicitation_completed_result(&plan.auth_failure, result.meta) } -#[expect( - clippy::await_holding_invalid_type, - reason = "Codex Apps cache refresh reads through the session-owned manager guard" -)] async fn refresh_codex_apps_after_connector_auth(sess: &Session, turn_context: &TurnContext) { let mcp_tools_result = { - let manager = sess.services.mcp_connection_manager.read().await; + let manager = sess.services.mcp_connection_manager.load_full(); manager.hard_refresh_codex_apps_tools_cache().await }; @@ -694,10 +688,6 @@ async fn refresh_codex_apps_after_connector_auth(sess: &Session, turn_context: & } } -#[expect( - clippy::await_holding_invalid_type, - reason = "MCP sandbox metadata reads through the session-owned manager guard" -)] async fn augment_mcp_tool_request_meta_with_sandbox_state( sess: &Session, turn_context: &TurnContext, @@ -707,8 +697,7 @@ async fn augment_mcp_tool_request_meta_with_sandbox_state( let supports_sandbox_state_meta = sess .services .mcp_connection_manager - .read() - .await + .load_full() .server_supports_sandbox_state_meta_capability(server) .await .unwrap_or(false); @@ -757,8 +746,7 @@ async fn maybe_mark_thread_memory_mode_polluted( let pollutes_memory = sess .services .mcp_connection_manager - .read() - .await + .load_full() .server_pollutes_memory(server); if !pollutes_memory { return; @@ -1414,17 +1402,13 @@ async fn mcp_tool_approval_decision_from_guardian( } } -#[expect( - clippy::await_holding_invalid_type, - reason = "MCP approval metadata reads through the session-owned manager guard" -)] pub(crate) async fn lookup_mcp_tool_metadata( sess: &Session, turn_context: &TurnContext, server: &str, tool_name: &str, ) -> Option { - let manager = sess.services.mcp_connection_manager.read().await; + let manager = sess.services.mcp_connection_manager.load_full(); let plugin_id = manager .plugin_id_for_mcp_server_name(server) .map(str::to_string); @@ -1509,10 +1493,6 @@ fn get_mcp_app_resource_uri( }) } -#[expect( - clippy::await_holding_invalid_type, - reason = "MCP app metadata reads through the session-owned manager guard" -)] async fn lookup_mcp_app_usage_metadata( sess: &Session, server: &str, @@ -1521,8 +1501,7 @@ async fn lookup_mcp_app_usage_metadata( let tools = sess .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_tools() .await; diff --git a/codex-rs/core/src/mcp_tool_call_tests.rs b/codex-rs/core/src/mcp_tool_call_tests.rs index da54fe187..723948b7b 100644 --- a/codex-rs/core/src/mcp_tool_call_tests.rs +++ b/codex-rs/core/src/mcp_tool_call_tests.rs @@ -1280,7 +1280,10 @@ async fn install_host_owned_codex_apps_manager(session: &Session, turn_context: /*elicitation_reviewer*/ None, ) .await; - *session.services.mcp_connection_manager.write().await = manager; + session + .services + .mcp_connection_manager + .store(Arc::new(manager)); } #[tokio::test] diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index 10b0b27cc..bfb87a0ff 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -580,6 +580,9 @@ pub async fn set_thread_memory_mode(sess: &Arc, sub_id: String, mode: T } async fn shutdown_session_runtime(sess: &Arc) { + if let Some(startup_prewarm) = sess.take_session_startup_prewarm().await { + startup_prewarm.abort().await; + } sess.abort_all_tasks(TurnAbortReason::Interrupted).await; let _ = sess.conversation.shutdown().await; sess.services @@ -589,11 +592,11 @@ async fn shutdown_session_runtime(sess: &Arc) { if let Err(err) = sess.services.code_mode_service.shutdown().await { warn!("failed to shutdown code mode session: {err}"); } - let mcp_shutdown = { - let mut manager = sess.services.mcp_connection_manager.write().await; - manager.begin_shutdown() - }; - mcp_shutdown.await; + sess.services + .mcp_connection_manager + .load_full() + .shutdown() + .await; sess.guardian_review_session.shutdown().await; } diff --git a/codex-rs/core/src/session/mcp.rs b/codex-rs/core/src/session/mcp.rs index 779813641..39333801c 100644 --- a/codex-rs/core/src/session/mcp.rs +++ b/codex-rs/core/src/session/mcp.rs @@ -91,8 +91,7 @@ impl Session { if self .services .mcp_connection_manager - .read() - .await + .load_full() .elicitations_auto_deny() { return McpServerElicitationOutcome { @@ -226,16 +225,11 @@ impl Session { self.services .mcp_connection_manager - .read() - .await + .load_full() .resolve_elicitation(server_name, id, response) .await } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP resource calls are serialized through the session-owned manager guard" - )] pub async fn list_resources( &self, server: &str, @@ -243,16 +237,11 @@ impl Session { ) -> anyhow::Result { self.services .mcp_connection_manager - .read() - .await + .load_full() .list_resources(server, params) .await } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP resource calls are serialized through the session-owned manager guard" - )] pub async fn list_resource_templates( &self, server: &str, @@ -260,16 +249,11 @@ impl Session { ) -> anyhow::Result { self.services .mcp_connection_manager - .read() - .await + .load_full() .list_resource_templates(server, params) .await } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP resource calls are serialized through the session-owned manager guard" - )] pub async fn read_resource( &self, server: &str, @@ -277,16 +261,11 @@ impl Session { ) -> anyhow::Result { self.services .mcp_connection_manager - .read() - .await + .load_full() .read_resource(server, params) .await } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP tool calls are serialized through the session-owned manager guard" - )] pub async fn call_tool( &self, server: &str, @@ -296,8 +275,7 @@ impl Session { ) -> anyhow::Result { self.services .mcp_connection_manager - .read() - .await + .load_full() .call_tool(server, tool, arguments, meta) .await } @@ -366,14 +344,12 @@ impl Session { ) .await; { - let current_manager = self.services.mcp_connection_manager.read().await; + let current_manager = self.services.mcp_connection_manager.load_full(); refreshed_manager.set_elicitations_auto_deny(current_manager.elicitations_auto_deny()); } - let mut old_manager = { - let mut manager = self.services.mcp_connection_manager.write().await; - std::mem::replace(&mut *manager, refreshed_manager) - }; - old_manager.shutdown().await; + self.services + .mcp_connection_manager + .store(Arc::new(refreshed_manager)); } pub(crate) async fn refresh_mcp_servers_if_requested( diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 2e5d5643d..7cc091e43 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -793,7 +793,7 @@ impl Codex { ..Default::default() }) .await?; - let mcp_connection_manager = self.session.services.mcp_connection_manager.read().await; + let mcp_connection_manager = self.session.services.mcp_connection_manager.load_full(); mcp_connection_manager.set_elicitations_auto_deny(mcp_elicitations_auto_deny); Ok(()) } @@ -2748,10 +2748,6 @@ impl Session { } } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP app context rendering reads through the session-owned manager guard" - )] pub(crate) async fn build_initial_context( &self, turn_context: &TurnContext, @@ -2844,7 +2840,7 @@ impl Session { } } if turn_context.config.include_apps_instructions && turn_context.apps_enabled() { - let mcp_connection_manager = self.services.mcp_connection_manager.read().await; + let mcp_connection_manager = self.services.mcp_connection_manager.load_full(); let accessible_and_enabled_connectors = connectors::list_accessible_and_enabled_connectors_from_manager( &mcp_connection_manager, diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 98d19957c..655454a32 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -981,13 +981,13 @@ impl Session { // before any MCP-related events. It is reasonable to consider // changing this to use Option or OnceCell, though the current // setup is straightforward enough and performs well. - mcp_connection_manager: Arc::new(RwLock::new( + mcp_connection_manager: arc_swap::ArcSwap::from_pointee( McpConnectionManager::new_uninitialized_with_permission_profile( &config.permissions.approval_policy, config.permissions.permission_profile(), config.prefix_mcp_tool_names(), ), - )), + ), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index b83217293..8d8208205 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -319,8 +319,7 @@ async fn request_mcp_server_elicitation_auto_accepts_when_auto_deny_is_enabled() session .services .mcp_connection_manager - .read() - .await + .load_full() .set_elicitations_auto_deny(/*auto_deny*/ true); let requested_schema: McpElicitationSchema = serde_json::from_value(json!({ @@ -4816,13 +4815,13 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { ); let services = SessionServices { - mcp_connection_manager: Arc::new(RwLock::new( + mcp_connection_manager: arc_swap::ArcSwap::from_pointee( McpConnectionManager::new_uninitialized_with_permission_profile( &config.permissions.approval_policy, config.permissions.permission_profile(), config.prefix_mcp_tool_names(), ), - )), + ), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -6894,13 +6893,13 @@ where ); let services = SessionServices { - mcp_connection_manager: Arc::new(RwLock::new( + mcp_connection_manager: arc_swap::ArcSwap::from_pointee( McpConnectionManager::new_uninitialized_with_permission_profile( &config.permissions.approval_policy, config.permissions.permission_profile(), config.prefix_mcp_tool_names(), ), - )), + ), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -9440,18 +9439,13 @@ async fn abort_review_task_emits_exited_then_aborted_and_records_history() { } #[tokio::test] -#[expect( - clippy::await_holding_invalid_type, - reason = "test builds a router from session-owned MCP manager state" -)] async fn fatal_tool_error_stops_turn_and_reports_error() { let (session, turn_context, _rx) = make_session_and_context_with_rx().await; let tools = { session .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_tools() .await }; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 95d24dc03..722957d30 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -432,10 +432,6 @@ async fn run_hooks_and_record_inputs( blocked_input && !accepted_user_input } -#[expect( - clippy::await_holding_invalid_type, - reason = "MCP tool listing borrows the read guard across cancellation-aware await" -)] #[instrument(level = "trace", skip_all)] async fn build_skills_and_plugins( sess: &Arc, @@ -473,8 +469,7 @@ async fn build_skills_and_plugins( match sess .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_tools() .or_cancel(cancellation_token) .await @@ -1078,10 +1073,6 @@ async fn run_sampling_request( } } -#[expect( - clippy::await_holding_invalid_type, - reason = "tool router construction reads through the session-owned manager guard" -)] #[instrument(level = "trace", skip_all, fields( @@ -1095,18 +1086,12 @@ pub(crate) async fn built_tools( turn_context: &TurnContext, cancellation_token: &CancellationToken, ) -> CodexResult> { - let mcp_connection_manager = sess - .services - .mcp_connection_manager - .read() - .instrument(trace_span!("read_mcp_connection_manager")) - .await; + let mcp_connection_manager = sess.services.mcp_connection_manager.load_full(); let has_mcp_servers = mcp_connection_manager.has_servers(); let all_mcp_tools = mcp_connection_manager .list_all_tools() .or_cancel(cancellation_token) .await?; - drop(mcp_connection_manager); let loaded_plugins = sess .services .plugins_manager diff --git a/codex-rs/core/src/session/turn_context.rs b/codex-rs/core/src/session/turn_context.rs index e3bd01081..10eff3f3b 100644 --- a/codex-rs/core/src/session/turn_context.rs +++ b/codex-rs/core/src/session/turn_context.rs @@ -724,7 +724,7 @@ impl Session { .unwrap_or_else(|| session_configuration.cwd().clone()); let per_turn_config = Self::build_per_turn_config(&session_configuration, cwd.clone()); { - let mcp_connection_manager = self.services.mcp_connection_manager.read().await; + let mcp_connection_manager = self.services.mcp_connection_manager.load_full(); mcp_connection_manager.set_approval_policy(&session_configuration.approval_policy); mcp_connection_manager .set_permission_profile(session_configuration.permission_profile()); diff --git a/codex-rs/core/src/session_startup_prewarm.rs b/codex-rs/core/src/session_startup_prewarm.rs index ba2cc42f2..f4948ae2f 100644 --- a/codex-rs/core/src/session_startup_prewarm.rs +++ b/codex-rs/core/src/session_startup_prewarm.rs @@ -4,6 +4,7 @@ use std::time::Instant; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use tokio_util::task::AbortOnDropHandle; use tracing::info; use tracing::warn; @@ -19,7 +20,7 @@ use codex_protocol::error::Result as CodexResult; use codex_protocol::models::BaseInstructions; pub(crate) struct SessionStartupPrewarmHandle { - task: JoinHandle>, + task: AbortOnDropHandle>, started_at: Instant, timeout: Duration, } @@ -40,12 +41,17 @@ impl SessionStartupPrewarmHandle { timeout: Duration, ) -> Self { Self { - task, + task: AbortOnDropHandle::new(task), started_at, timeout, } } + pub(crate) async fn abort(self) { + self.task.abort(); + let _ = self.task.await; + } + async fn resolve( self, session_telemetry: &SessionTelemetry, diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 9bc4d7391..b4c499118 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -35,12 +35,12 @@ use codex_thread_store::ThreadStore; use std::path::PathBuf; use tokio::runtime::Handle; use tokio::sync::Mutex; -use tokio::sync::RwLock; use tokio::sync::watch; use tokio_util::sync::CancellationToken; pub(crate) struct SessionServices { - pub(crate) mcp_connection_manager: Arc>, + /// The latest manager; callers retain an owned handle while performing MCP I/O. + pub(crate) mcp_connection_manager: ArcSwap, pub(crate) mcp_startup_cancellation_token: Mutex, pub(crate) unified_exec_manager: UnifiedExecProcessManager, #[cfg_attr(not(unix), allow(dead_code))] @@ -87,18 +87,13 @@ pub(crate) struct SessionServices { impl SessionServices { /// Installs the manager before validating required servers so startup-time elicitation can /// resolve through the session's manager while validation waits. - #[expect( - clippy::await_holding_invalid_type, - reason = "required MCP validation keeps the installed manager reachable for startup-time elicitation" - )] pub(crate) async fn install_mcp_connection_manager( &self, manager: McpConnectionManager, ) -> Result<()> { - *self.mcp_connection_manager.write().await = manager; + self.mcp_connection_manager.store(Arc::new(manager)); self.mcp_connection_manager - .read() - .await + .load_full() .validate_required_servers() .await } diff --git a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs index c46df13b1..e96aad755 100644 --- a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs +++ b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs @@ -40,10 +40,6 @@ impl ToolExecutor for ListMcpResourceTemplatesHandler { true } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP resource template listing reads through the session-owned manager guard" - )] async fn handle( &self, invocation: ToolInvocation, @@ -107,8 +103,7 @@ impl ToolExecutor for ListMcpResourceTemplatesHandler { let templates = session .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_resource_templates() .await; Ok(ListResourceTemplatesPayload::from_all_servers(templates)) diff --git a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs index f7fcad342..1b052184e 100644 --- a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs +++ b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs @@ -40,10 +40,6 @@ impl ToolExecutor for ListMcpResourcesHandler { true } - #[expect( - clippy::await_holding_invalid_type, - reason = "MCP resource listing reads through the session-owned manager guard" - )] async fn handle( &self, invocation: ToolInvocation, @@ -105,8 +101,7 @@ impl ToolExecutor for ListMcpResourcesHandler { let resources = session .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_resources() .await; Ok(ListResourcesPayload::from_all_servers(resources)) diff --git a/codex-rs/core/src/tools/handlers/request_plugin_install.rs b/codex-rs/core/src/tools/handlers/request_plugin_install.rs index 3f7bd92d2..ce5bb3fb1 100644 --- a/codex-rs/core/src/tools/handlers/request_plugin_install.rs +++ b/codex-rs/core/src/tools/handlers/request_plugin_install.rs @@ -304,10 +304,6 @@ fn is_remote_plugin_install_suggestion(plugin_id: &str) -> bool { .is_some_and(|(_, marketplace_name)| marketplace_name == REMOTE_GLOBAL_MARKETPLACE_NAME) } -#[expect( - clippy::await_holding_invalid_type, - reason = "connector cache refresh reads through the session-owned manager guard" -)] async fn refresh_missing_requested_connectors( session: &crate::session::session::Session, turn: &crate::session::turn_context::TurnContext, @@ -319,7 +315,7 @@ async fn refresh_missing_requested_connectors( return Some(Vec::new()); } - let manager = session.services.mcp_connection_manager.read().await; + let manager = session.services.mcp_connection_manager.load_full(); let mcp_tools = manager.list_all_tools().await; let accessible_connectors = connectors::with_app_enabled_state( connectors::accessible_connectors_from_mcp_tools(&mcp_tools), diff --git a/codex-rs/core/src/tools/router_tests.rs b/codex-rs/core/src/tools/router_tests.rs index cece7fc17..7c3e0157b 100644 --- a/codex-rs/core/src/tools/router_tests.rs +++ b/codex-rs/core/src/tools/router_tests.rs @@ -95,17 +95,12 @@ fn extension_tool_test_registry() -> Arc> { } #[tokio::test] -#[expect( - clippy::await_holding_invalid_type, - reason = "test builds a router from session-owned MCP manager state" -)] async fn parallel_support_does_not_match_namespaced_local_tool_names() -> anyhow::Result<()> { let (session, turn) = make_session_and_context().await; let mcp_tools = session .services .mcp_connection_manager - .read() - .await + .load_full() .list_all_tools() .await; let router = ToolRouter::from_turn_context( diff --git a/codex-rs/core/tests/suite/rmcp_client.rs b/codex-rs/core/tests/suite/rmcp_client.rs index 5cb61fa4a..c47aa0970 100644 --- a/codex-rs/core/tests/suite/rmcp_client.rs +++ b/codex-rs/core/tests/suite/rmcp_client.rs @@ -558,6 +558,52 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn shutdown_cancels_startup_prewarm_waiting_for_mcp_startup() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_websocket_server(vec![vec![vec![ + responses::ev_response_created("warm-1"), + responses::ev_completed("warm-1"), + ]]]) + .await; + let pending_mcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; + let pending_mcp_url = format!("http://{}/mcp", pending_mcp_listener.local_addr()?); + + let fixture = test_codex() + .with_config(move |config| { + insert_mcp_server( + config, + "shutdown_prewarm", + McpServerTransportConfig::StreamableHttp { + url: pending_mcp_url, + bearer_token_env_var: None, + http_headers: None, + env_http_headers: None, + }, + TestMcpServerOptions::default(), + ); + }) + .build_with_websocket_server(&server) + .await?; + + let (_pending_mcp_connection, _) = + tokio::time::timeout(Duration::from_secs(5), pending_mcp_listener.accept()) + .await + .context("startup prewarm should start the MCP connection")??; + tokio::time::timeout(Duration::from_secs(2), fixture.codex.shutdown_and_wait()) + .await + .context("shutdown should not wait for startup prewarm MCP startup")??; + tokio::time::sleep(Duration::from_millis(100)).await; + assert!( + server.connections().is_empty(), + "startup prewarm should not send a websocket request after shutdown" + ); + + server.shutdown().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[serial(mcp_cwd)] async fn stdio_server_uses_configured_cwd_before_runtime_fallback() -> anyhow::Result<()> {