mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
Use latest-wins MCP manager replacement (#27259)
## Summary We originally addressed startup prewarming holding the read side of `RwLock<McpConnectionManager>` by snapshotting tool-list state. Review feedback identified the broader ownership problem: the outer synchronization should only publish or retrieve the current manager, while MCP operations rely on the manager's internal synchronization. A follow-up preserved operation retirement with a separate gate, but further review questioned whether that synchronization was actually required and whether we could support latest-wins replacement instead. This PR now stores the current MCP manager in `ArcSwap`. Each operation uses `load_full()` to obtain an owned `Arc<McpConnectionManager>`, then performs MCP I/O without retaining the publication mechanism. Refresh cancels obsolete startup work, constructs a replacement, and atomically publishes it. New operations see the latest manager, while operations that already loaded the previous manager retain a valid handle. Refresh happens at a turn boundary, so there should be no active user tool calls to drain. Git history supports dropping the outer `RwLock`. It was introduced in `03ffe4d595` on November 17, 2025 for non-blocking MCP startup: the session published an empty manager, startup initialized that same object while holding the write lock, and readers waited for initialization. `7cd2e84026` on February 19, 2026 removed that two-phase initialization in favor of constructing a fresh manager and swapping it in, explicitly noting that `Option` or `OnceCell` could replace the placeholder design. Hot reload later reused the existing lock to publish a replacement, but I found no indication that the lock was introduced to guarantee in-flight tool calls finish before refresh or shutdown. Terminal shutdown remains separate from refresh: it aborts startup prewarming and active tasks before shutting down the current manager, so tool calls may be interrupted and no model WebSocket work continues after shutdown. Focused regression coverage exercises pending tool-list cancellation, deferred refresh, and startup-prewarm shutdown.
This commit is contained in:
committed by
GitHub
Unverified
parent
d2f6d23c6c
commit
41b4fabbb4
@@ -348,22 +348,12 @@ impl McpConnectionManager {
|
||||
!self.clients.is_empty()
|
||||
}
|
||||
|
||||
/// Drain all MCP clients from this manager and return a future that stops
|
||||
/// them and terminates their stdio server processes.
|
||||
pub fn begin_shutdown(&mut self) -> impl std::future::Future<Output = ()> + Send + 'static {
|
||||
self.startup_cancellation_token.cancel();
|
||||
let clients = std::mem::take(&mut self.clients);
|
||||
self.server_metadata.clear();
|
||||
async move {
|
||||
for client in clients.into_values() {
|
||||
client.shutdown().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stop all MCP clients owned by this manager and terminate stdio server processes.
|
||||
pub async fn shutdown(&mut self) {
|
||||
self.begin_shutdown().await;
|
||||
pub async fn shutdown(&self) {
|
||||
self.startup_cancellation_token.cancel();
|
||||
for client in self.clients.values() {
|
||||
client.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn server_origin(&self, server_name: &str) -> Option<&str> {
|
||||
|
||||
@@ -980,6 +980,48 @@ async fn list_all_tools_blocks_while_client_is_pending_without_cached_tool_info_
|
||||
assert!(timeout_result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_cancels_pending_tool_listing() {
|
||||
let cancel_token = CancellationToken::new();
|
||||
let cancel_token_for_startup = cancel_token.clone();
|
||||
let (started_tx, started_rx) = tokio::sync::oneshot::channel();
|
||||
let pending_client = async move {
|
||||
let _ = started_tx.send(());
|
||||
cancel_token_for_startup.cancelled().await;
|
||||
Err(StartupOutcomeError::Cancelled)
|
||||
}
|
||||
.boxed()
|
||||
.shared();
|
||||
let approval_policy = Constrained::allow_any(AskForApproval::OnFailure);
|
||||
let permission_profile = Constrained::allow_any(PermissionProfile::default());
|
||||
let mut manager = McpConnectionManager::new_uninitialized(
|
||||
&approval_policy,
|
||||
&permission_profile,
|
||||
/*prefix_mcp_tool_names*/ true,
|
||||
);
|
||||
manager.clients.insert(
|
||||
CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
AsyncManagedClient {
|
||||
client: pending_client,
|
||||
cached_tool_info_snapshot: None,
|
||||
cached_server_info: None,
|
||||
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
tool_plugin_provenance: Arc::new(ToolPluginProvenance::default()),
|
||||
cancel_token,
|
||||
},
|
||||
);
|
||||
let manager = Arc::new(manager);
|
||||
let manager_for_list = Arc::clone(&manager);
|
||||
let list_task = tokio::spawn(async move { manager_for_list.list_all_tools().await });
|
||||
|
||||
started_rx.await.expect("tool listing should start");
|
||||
tokio::time::timeout(Duration::from_secs(1), manager.shutdown())
|
||||
.await
|
||||
.expect("shutdown should cancel speculative tool listing");
|
||||
let tools = list_task.await.expect("tool listing task should not panic");
|
||||
assert!(tools.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_all_tools_does_not_block_when_cached_tool_info_snapshot_is_empty() {
|
||||
let pending_client = futures::future::pending::<Result<ManagedClient, StartupOutcomeError>>()
|
||||
|
||||
@@ -452,7 +452,7 @@ async fn delegated_mcp_guardian_abort_returns_synthetic_decline_answer() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delegated_mcp_user_reviewer_waits_for_metadata_lookup() {
|
||||
async fn delegated_mcp_user_reviewer_returns_none_without_metadata() {
|
||||
let (parent_session, parent_ctx, _rx_events) =
|
||||
crate::session::tests::make_session_and_context_with_rx().await;
|
||||
let pending_mcp_invocations = Arc::new(Mutex::new(HashMap::from([(
|
||||
@@ -464,21 +464,6 @@ async fn delegated_mcp_user_reviewer_waits_for_metadata_lookup() {
|
||||
},
|
||||
)])));
|
||||
let cancel_token = CancellationToken::new();
|
||||
let manager = Arc::clone(&parent_session.services.mcp_connection_manager);
|
||||
let (manager_locked_tx, manager_locked_rx) = std::sync::mpsc::sync_channel(0);
|
||||
let (release_manager_tx, release_manager_rx) = std::sync::mpsc::sync_channel(0);
|
||||
let manager_lock = tokio::task::spawn_blocking(move || {
|
||||
let _manager_guard = manager.blocking_write();
|
||||
manager_locked_tx
|
||||
.send(())
|
||||
.expect("manager lock receiver should remain open");
|
||||
release_manager_rx
|
||||
.recv()
|
||||
.expect("manager lock release sender should remain open");
|
||||
});
|
||||
manager_locked_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.expect("manager write lock should be acquired");
|
||||
|
||||
let event = RequestUserInputEvent {
|
||||
call_id: "call-1".to_string(),
|
||||
@@ -498,24 +483,7 @@ async fn delegated_mcp_user_reviewer_waits_for_metadata_lookup() {
|
||||
&pending_mcp_invocations,
|
||||
&event,
|
||||
&cancel_token,
|
||||
);
|
||||
tokio::pin!(response);
|
||||
assert!(
|
||||
timeout(Duration::from_millis(100), &mut response)
|
||||
.await
|
||||
.is_err(),
|
||||
"manual reviewer should wait for MCP metadata"
|
||||
);
|
||||
release_manager_tx
|
||||
.send(())
|
||||
.expect("manager lock holder should remain open");
|
||||
manager_lock
|
||||
.await
|
||||
.expect("manager lock task should not panic");
|
||||
assert_eq!(
|
||||
timeout(Duration::from_secs(1), response)
|
||||
.await
|
||||
.expect("manual reviewer should finish after MCP metadata lookup"),
|
||||
None
|
||||
);
|
||||
)
|
||||
.await;
|
||||
assert_eq!(response, None);
|
||||
}
|
||||
|
||||
@@ -287,7 +287,7 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_mcp_manager(
|
||||
drop(rx_event);
|
||||
|
||||
let cancel_token = CancellationToken::new();
|
||||
let mut mcp_connection_manager = McpConnectionManager::new(
|
||||
let mcp_connection_manager = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_status_entries,
|
||||
|
||||
@@ -319,8 +319,7 @@ async fn handle_approved_mcp_tool_call(
|
||||
let server_origin = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.server_origin(&server)
|
||||
.map(str::to_string);
|
||||
|
||||
@@ -606,8 +605,7 @@ async fn maybe_request_codex_apps_auth_elicitation(
|
||||
if !sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.is_host_owned_codex_apps_server(server)
|
||||
{
|
||||
return result;
|
||||
@@ -669,13 +667,9 @@ async fn maybe_request_codex_apps_auth_elicitation(
|
||||
auth_elicitation_completed_result(&plan.auth_failure, result.meta)
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "Codex Apps cache refresh reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn refresh_codex_apps_after_connector_auth(sess: &Session, turn_context: &TurnContext) {
|
||||
let mcp_tools_result = {
|
||||
let manager = sess.services.mcp_connection_manager.read().await;
|
||||
let manager = sess.services.mcp_connection_manager.load_full();
|
||||
manager.hard_refresh_codex_apps_tools_cache().await
|
||||
};
|
||||
|
||||
@@ -694,10 +688,6 @@ async fn refresh_codex_apps_after_connector_auth(sess: &Session, turn_context: &
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP sandbox metadata reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn augment_mcp_tool_request_meta_with_sandbox_state(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
@@ -707,8 +697,7 @@ async fn augment_mcp_tool_request_meta_with_sandbox_state(
|
||||
let supports_sandbox_state_meta = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.server_supports_sandbox_state_meta_capability(server)
|
||||
.await
|
||||
.unwrap_or(false);
|
||||
@@ -757,8 +746,7 @@ async fn maybe_mark_thread_memory_mode_polluted(
|
||||
let pollutes_memory = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.server_pollutes_memory(server);
|
||||
if !pollutes_memory {
|
||||
return;
|
||||
@@ -1414,17 +1402,13 @@ async fn mcp_tool_approval_decision_from_guardian(
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP approval metadata reads through the session-owned manager guard"
|
||||
)]
|
||||
pub(crate) async fn lookup_mcp_tool_metadata(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
server: &str,
|
||||
tool_name: &str,
|
||||
) -> Option<McpToolApprovalMetadata> {
|
||||
let manager = sess.services.mcp_connection_manager.read().await;
|
||||
let manager = sess.services.mcp_connection_manager.load_full();
|
||||
let plugin_id = manager
|
||||
.plugin_id_for_mcp_server_name(server)
|
||||
.map(str::to_string);
|
||||
@@ -1509,10 +1493,6 @@ fn get_mcp_app_resource_uri(
|
||||
})
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP app metadata reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn lookup_mcp_app_usage_metadata(
|
||||
sess: &Session,
|
||||
server: &str,
|
||||
@@ -1521,8 +1501,7 @@ async fn lookup_mcp_app_usage_metadata(
|
||||
let tools = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.list_all_tools()
|
||||
.await;
|
||||
|
||||
|
||||
@@ -1280,7 +1280,10 @@ async fn install_host_owned_codex_apps_manager(session: &Session, turn_context:
|
||||
/*elicitation_reviewer*/ None,
|
||||
)
|
||||
.await;
|
||||
*session.services.mcp_connection_manager.write().await = manager;
|
||||
session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.store(Arc::new(manager));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -580,6 +580,9 @@ pub async fn set_thread_memory_mode(sess: &Arc<Session>, sub_id: String, mode: T
|
||||
}
|
||||
|
||||
async fn shutdown_session_runtime(sess: &Arc<Session>) {
|
||||
if let Some(startup_prewarm) = sess.take_session_startup_prewarm().await {
|
||||
startup_prewarm.abort().await;
|
||||
}
|
||||
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
let _ = sess.conversation.shutdown().await;
|
||||
sess.services
|
||||
@@ -589,11 +592,11 @@ async fn shutdown_session_runtime(sess: &Arc<Session>) {
|
||||
if let Err(err) = sess.services.code_mode_service.shutdown().await {
|
||||
warn!("failed to shutdown code mode session: {err}");
|
||||
}
|
||||
let mcp_shutdown = {
|
||||
let mut manager = sess.services.mcp_connection_manager.write().await;
|
||||
manager.begin_shutdown()
|
||||
};
|
||||
mcp_shutdown.await;
|
||||
sess.services
|
||||
.mcp_connection_manager
|
||||
.load_full()
|
||||
.shutdown()
|
||||
.await;
|
||||
sess.guardian_review_session.shutdown().await;
|
||||
}
|
||||
|
||||
|
||||
@@ -91,8 +91,7 @@ impl Session {
|
||||
if self
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.elicitations_auto_deny()
|
||||
{
|
||||
return McpServerElicitationOutcome {
|
||||
@@ -226,16 +225,11 @@ impl Session {
|
||||
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.resolve_elicitation(server_name, id, response)
|
||||
.await
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP resource calls are serialized through the session-owned manager guard"
|
||||
)]
|
||||
pub async fn list_resources(
|
||||
&self,
|
||||
server: &str,
|
||||
@@ -243,16 +237,11 @@ impl Session {
|
||||
) -> anyhow::Result<ListResourcesResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.list_resources(server, params)
|
||||
.await
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP resource calls are serialized through the session-owned manager guard"
|
||||
)]
|
||||
pub async fn list_resource_templates(
|
||||
&self,
|
||||
server: &str,
|
||||
@@ -260,16 +249,11 @@ impl Session {
|
||||
) -> anyhow::Result<ListResourceTemplatesResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.list_resource_templates(server, params)
|
||||
.await
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP resource calls are serialized through the session-owned manager guard"
|
||||
)]
|
||||
pub async fn read_resource(
|
||||
&self,
|
||||
server: &str,
|
||||
@@ -277,16 +261,11 @@ impl Session {
|
||||
) -> anyhow::Result<ReadResourceResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.read_resource(server, params)
|
||||
.await
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP tool calls are serialized through the session-owned manager guard"
|
||||
)]
|
||||
pub async fn call_tool(
|
||||
&self,
|
||||
server: &str,
|
||||
@@ -296,8 +275,7 @@ impl Session {
|
||||
) -> anyhow::Result<CallToolResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.call_tool(server, tool, arguments, meta)
|
||||
.await
|
||||
}
|
||||
@@ -366,14 +344,12 @@ impl Session {
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let current_manager = self.services.mcp_connection_manager.read().await;
|
||||
let current_manager = self.services.mcp_connection_manager.load_full();
|
||||
refreshed_manager.set_elicitations_auto_deny(current_manager.elicitations_auto_deny());
|
||||
}
|
||||
let mut old_manager = {
|
||||
let mut manager = self.services.mcp_connection_manager.write().await;
|
||||
std::mem::replace(&mut *manager, refreshed_manager)
|
||||
};
|
||||
old_manager.shutdown().await;
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.store(Arc::new(refreshed_manager));
|
||||
}
|
||||
|
||||
pub(crate) async fn refresh_mcp_servers_if_requested(
|
||||
|
||||
@@ -793,7 +793,7 @@ impl Codex {
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let mcp_connection_manager = self.session.services.mcp_connection_manager.read().await;
|
||||
let mcp_connection_manager = self.session.services.mcp_connection_manager.load_full();
|
||||
mcp_connection_manager.set_elicitations_auto_deny(mcp_elicitations_auto_deny);
|
||||
Ok(())
|
||||
}
|
||||
@@ -2748,10 +2748,6 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP app context rendering reads through the session-owned manager guard"
|
||||
)]
|
||||
pub(crate) async fn build_initial_context(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
@@ -2844,7 +2840,7 @@ impl Session {
|
||||
}
|
||||
}
|
||||
if turn_context.config.include_apps_instructions && turn_context.apps_enabled() {
|
||||
let mcp_connection_manager = self.services.mcp_connection_manager.read().await;
|
||||
let mcp_connection_manager = self.services.mcp_connection_manager.load_full();
|
||||
let accessible_and_enabled_connectors =
|
||||
connectors::list_accessible_and_enabled_connectors_from_manager(
|
||||
&mcp_connection_manager,
|
||||
|
||||
@@ -981,13 +981,13 @@ impl Session {
|
||||
// before any MCP-related events. It is reasonable to consider
|
||||
// changing this to use Option or OnceCell, though the current
|
||||
// setup is straightforward enough and performs well.
|
||||
mcp_connection_manager: Arc::new(RwLock::new(
|
||||
mcp_connection_manager: arc_swap::ArcSwap::from_pointee(
|
||||
McpConnectionManager::new_uninitialized_with_permission_profile(
|
||||
&config.permissions.approval_policy,
|
||||
config.permissions.permission_profile(),
|
||||
config.prefix_mcp_tool_names(),
|
||||
),
|
||||
)),
|
||||
),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
unified_exec_manager: UnifiedExecProcessManager::new(
|
||||
config.background_terminal_max_timeout,
|
||||
|
||||
@@ -319,8 +319,7 @@ async fn request_mcp_server_elicitation_auto_accepts_when_auto_deny_is_enabled()
|
||||
session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.set_elicitations_auto_deny(/*auto_deny*/ true);
|
||||
|
||||
let requested_schema: McpElicitationSchema = serde_json::from_value(json!({
|
||||
@@ -4816,13 +4815,13 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
);
|
||||
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(
|
||||
mcp_connection_manager: arc_swap::ArcSwap::from_pointee(
|
||||
McpConnectionManager::new_uninitialized_with_permission_profile(
|
||||
&config.permissions.approval_policy,
|
||||
config.permissions.permission_profile(),
|
||||
config.prefix_mcp_tool_names(),
|
||||
),
|
||||
)),
|
||||
),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
unified_exec_manager: UnifiedExecProcessManager::new(
|
||||
config.background_terminal_max_timeout,
|
||||
@@ -6894,13 +6893,13 @@ where
|
||||
);
|
||||
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(
|
||||
mcp_connection_manager: arc_swap::ArcSwap::from_pointee(
|
||||
McpConnectionManager::new_uninitialized_with_permission_profile(
|
||||
&config.permissions.approval_policy,
|
||||
config.permissions.permission_profile(),
|
||||
config.prefix_mcp_tool_names(),
|
||||
),
|
||||
)),
|
||||
),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
unified_exec_manager: UnifiedExecProcessManager::new(
|
||||
config.background_terminal_max_timeout,
|
||||
@@ -9440,18 +9439,13 @@ async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "test builds a router from session-owned MCP manager state"
|
||||
)]
|
||||
async fn fatal_tool_error_stops_turn_and_reports_error() {
|
||||
let (session, turn_context, _rx) = make_session_and_context_with_rx().await;
|
||||
let tools = {
|
||||
session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.list_all_tools()
|
||||
.await
|
||||
};
|
||||
|
||||
@@ -432,10 +432,6 @@ async fn run_hooks_and_record_inputs(
|
||||
blocked_input && !accepted_user_input
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP tool listing borrows the read guard across cancellation-aware await"
|
||||
)]
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
async fn build_skills_and_plugins(
|
||||
sess: &Arc<Session>,
|
||||
@@ -473,8 +469,7 @@ async fn build_skills_and_plugins(
|
||||
match sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.list_all_tools()
|
||||
.or_cancel(cancellation_token)
|
||||
.await
|
||||
@@ -1078,10 +1073,6 @@ async fn run_sampling_request(
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "tool router construction reads through the session-owned manager guard"
|
||||
)]
|
||||
#[instrument(level = "trace",
|
||||
skip_all,
|
||||
fields(
|
||||
@@ -1095,18 +1086,12 @@ pub(crate) async fn built_tools(
|
||||
turn_context: &TurnContext,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> CodexResult<Arc<ToolRouter>> {
|
||||
let mcp_connection_manager = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.instrument(trace_span!("read_mcp_connection_manager"))
|
||||
.await;
|
||||
let mcp_connection_manager = sess.services.mcp_connection_manager.load_full();
|
||||
let has_mcp_servers = mcp_connection_manager.has_servers();
|
||||
let all_mcp_tools = mcp_connection_manager
|
||||
.list_all_tools()
|
||||
.or_cancel(cancellation_token)
|
||||
.await?;
|
||||
drop(mcp_connection_manager);
|
||||
let loaded_plugins = sess
|
||||
.services
|
||||
.plugins_manager
|
||||
|
||||
@@ -724,7 +724,7 @@ impl Session {
|
||||
.unwrap_or_else(|| session_configuration.cwd().clone());
|
||||
let per_turn_config = Self::build_per_turn_config(&session_configuration, cwd.clone());
|
||||
{
|
||||
let mcp_connection_manager = self.services.mcp_connection_manager.read().await;
|
||||
let mcp_connection_manager = self.services.mcp_connection_manager.load_full();
|
||||
mcp_connection_manager.set_approval_policy(&session_configuration.approval_policy);
|
||||
mcp_connection_manager
|
||||
.set_permission_profile(session_configuration.permission_profile());
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::time::Instant;
|
||||
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tokio_util::task::AbortOnDropHandle;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -19,7 +20,7 @@ use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
|
||||
pub(crate) struct SessionStartupPrewarmHandle {
|
||||
task: JoinHandle<CodexResult<ModelClientSession>>,
|
||||
task: AbortOnDropHandle<CodexResult<ModelClientSession>>,
|
||||
started_at: Instant,
|
||||
timeout: Duration,
|
||||
}
|
||||
@@ -40,12 +41,17 @@ impl SessionStartupPrewarmHandle {
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
task,
|
||||
task: AbortOnDropHandle::new(task),
|
||||
started_at,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn abort(self) {
|
||||
self.task.abort();
|
||||
let _ = self.task.await;
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
self,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
|
||||
@@ -35,12 +35,12 @@ use codex_thread_store::ThreadStore;
|
||||
use std::path::PathBuf;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub(crate) struct SessionServices {
|
||||
pub(crate) mcp_connection_manager: Arc<RwLock<McpConnectionManager>>,
|
||||
/// The latest manager; callers retain an owned handle while performing MCP I/O.
|
||||
pub(crate) mcp_connection_manager: ArcSwap<McpConnectionManager>,
|
||||
pub(crate) mcp_startup_cancellation_token: Mutex<CancellationToken>,
|
||||
pub(crate) unified_exec_manager: UnifiedExecProcessManager,
|
||||
#[cfg_attr(not(unix), allow(dead_code))]
|
||||
@@ -87,18 +87,13 @@ pub(crate) struct SessionServices {
|
||||
impl SessionServices {
|
||||
/// Installs the manager before validating required servers so startup-time elicitation can
|
||||
/// resolve through the session's manager while validation waits.
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "required MCP validation keeps the installed manager reachable for startup-time elicitation"
|
||||
)]
|
||||
pub(crate) async fn install_mcp_connection_manager(
|
||||
&self,
|
||||
manager: McpConnectionManager,
|
||||
) -> Result<()> {
|
||||
*self.mcp_connection_manager.write().await = manager;
|
||||
self.mcp_connection_manager.store(Arc::new(manager));
|
||||
self.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.validate_required_servers()
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -40,10 +40,6 @@ impl ToolExecutor<ToolInvocation> for ListMcpResourceTemplatesHandler {
|
||||
true
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP resource template listing reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn handle(
|
||||
&self,
|
||||
invocation: ToolInvocation,
|
||||
@@ -107,8 +103,7 @@ impl ToolExecutor<ToolInvocation> for ListMcpResourceTemplatesHandler {
|
||||
let templates = session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.list_all_resource_templates()
|
||||
.await;
|
||||
Ok(ListResourceTemplatesPayload::from_all_servers(templates))
|
||||
|
||||
@@ -40,10 +40,6 @@ impl ToolExecutor<ToolInvocation> for ListMcpResourcesHandler {
|
||||
true
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP resource listing reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn handle(
|
||||
&self,
|
||||
invocation: ToolInvocation,
|
||||
@@ -105,8 +101,7 @@ impl ToolExecutor<ToolInvocation> for ListMcpResourcesHandler {
|
||||
let resources = session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.list_all_resources()
|
||||
.await;
|
||||
Ok(ListResourcesPayload::from_all_servers(resources))
|
||||
|
||||
@@ -304,10 +304,6 @@ fn is_remote_plugin_install_suggestion(plugin_id: &str) -> bool {
|
||||
.is_some_and(|(_, marketplace_name)| marketplace_name == REMOTE_GLOBAL_MARKETPLACE_NAME)
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "connector cache refresh reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn refresh_missing_requested_connectors(
|
||||
session: &crate::session::session::Session,
|
||||
turn: &crate::session::turn_context::TurnContext,
|
||||
@@ -319,7 +315,7 @@ async fn refresh_missing_requested_connectors(
|
||||
return Some(Vec::new());
|
||||
}
|
||||
|
||||
let manager = session.services.mcp_connection_manager.read().await;
|
||||
let manager = session.services.mcp_connection_manager.load_full();
|
||||
let mcp_tools = manager.list_all_tools().await;
|
||||
let accessible_connectors = connectors::with_app_enabled_state(
|
||||
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
|
||||
|
||||
@@ -95,17 +95,12 @@ fn extension_tool_test_registry() -> Arc<ExtensionRegistry<Config>> {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "test builds a router from session-owned MCP manager state"
|
||||
)]
|
||||
async fn parallel_support_does_not_match_namespaced_local_tool_names() -> anyhow::Result<()> {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let mcp_tools = session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.load_full()
|
||||
.list_all_tools()
|
||||
.await;
|
||||
let router = ToolRouter::from_turn_context(
|
||||
|
||||
@@ -558,6 +558,52 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn shutdown_cancels_startup_prewarm_waiting_for_mcp_startup() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_websocket_server(vec![vec![vec![
|
||||
responses::ev_response_created("warm-1"),
|
||||
responses::ev_completed("warm-1"),
|
||||
]]])
|
||||
.await;
|
||||
let pending_mcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
|
||||
let pending_mcp_url = format!("http://{}/mcp", pending_mcp_listener.local_addr()?);
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_config(move |config| {
|
||||
insert_mcp_server(
|
||||
config,
|
||||
"shutdown_prewarm",
|
||||
McpServerTransportConfig::StreamableHttp {
|
||||
url: pending_mcp_url,
|
||||
bearer_token_env_var: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
},
|
||||
TestMcpServerOptions::default(),
|
||||
);
|
||||
})
|
||||
.build_with_websocket_server(&server)
|
||||
.await?;
|
||||
|
||||
let (_pending_mcp_connection, _) =
|
||||
tokio::time::timeout(Duration::from_secs(5), pending_mcp_listener.accept())
|
||||
.await
|
||||
.context("startup prewarm should start the MCP connection")??;
|
||||
tokio::time::timeout(Duration::from_secs(2), fixture.codex.shutdown_and_wait())
|
||||
.await
|
||||
.context("shutdown should not wait for startup prewarm MCP startup")??;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
assert!(
|
||||
server.connections().is_empty(),
|
||||
"startup prewarm should not send a websocket request after shutdown"
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
#[serial(mcp_cwd)]
|
||||
async fn stdio_server_uses_configured_cwd_before_runtime_fallback() -> anyhow::Result<()> {
|
||||
|
||||
Reference in New Issue
Block a user