Project selected plugin runtime by environment availability (#30093)

## Why

Selected plugin metadata is stable, but MCP processes are live runtime
state. They need different lifetimes:

- the MCP extension caches manifest, MCP, and connector declarations for
each stable selected root;
- each model step projects that cached metadata through the roots that
resolved as ready for that exact step;
- the MCP manager is rebuilt only when that availability projection
changes.

This matches executor skills: both features consume the same resolved
step roots instead of inferring readiness from the turn's selected
environments.

## Behavior

```text
E1 not ready for this step
  -> no E1 MCP servers or connectors
  -> cached plugin metadata stays in ext/mcp

E1 becomes ready
  -> reuse cached metadata
  -> publish one MCP runtime containing E1 capabilities

same ready roots on the next step
  -> reuse the exact runtime; no rediscovery and no MCP restart

resume
  -> create new extension thread state and a new MCP runtime
```

All model-facing consumers use the same step snapshot:

```text
resolved selected roots
        |
        v
extension MCP/connector projection
        |
        v
{ MCP config, connector snapshot, MCP manager }
        |
        +-> advertise model tools
        +-> build app/connector tools
        +-> execute MCP calls
```

## Cache contract

The existing MCP extension owns a cache keyed by the full
`SelectedCapabilityRoot`:

```rust
let state = thread_store.get_or_init(SelectedExecutorPluginMcpState::default);
```

The cache lives with extension thread state. Environment availability
filters projection but does not invalidate metadata. Resume creates new
thread state. There is no file watcher or executor generation because
contents behind a stable environment/root are assumed stable.

## What changes

- Keeps executor plugin discovery and cached metadata in `ext/mcp`.
- Caches MCP and connector declarations together per selected root.
- Uses the step's already-resolved capability roots, including lazy
environments that are not turn environments.
- Reuses the current MCP runtime when the ready-root projection is
unchanged.
- Uses the same step MCP manager and connector snapshot for
model-visible tools and execution.
- Resolves direct thread-scoped MCP requests from the current
selected-root projection.

## Deliberately out of scope

- `app/list` remains based on the latest global host-plugin state; this
PR does not make its response or notifications thread-specific.
- `required = true` startup semantics do not apply to delayed executor
MCP activation.
- No filesystem/content invalidation.
- No transport-disconnect watcher.
- No executor generations or environment replacement semantics.
- No client sharing across complete manager replacements.

## Stack

1. Extension-owned World State sections.
2. Project executor skills through World State.
3. Pin one MCP runtime to each model step.
4. **This PR:** project selected MCP and connector state from
extension-owned metadata.
5. Integration coverage for selected capability availability and resume.

## Verification

-
`selected_plugin_servers_use_managed_requirements_for_the_selected_root_id`
- The stacked integration PR covers unavailable to ready activation,
unchanged-runtime reuse, skills, MCP tools, connector attribution, and
cold resume.
This commit is contained in:
jif
2026-06-26 01:36:44 +01:00
committed by GitHub
Unverified
parent 5044062704
commit 3095ea9c3d
21 changed files with 455 additions and 163 deletions
+1
View File
@@ -3359,6 +3359,7 @@ name = "codex-mcp-extension"
version = "0.0.0"
dependencies = [
"codex-config",
"codex-connectors-extension",
"codex-core",
"codex-core-plugins",
"codex-exec-server",
+1
View File
@@ -166,6 +166,7 @@ codex-code-mode-protocol = { path = "code-mode-protocol" }
codex-home = { path = "codex-home" }
codex-config = { path = "config" }
codex-connectors = { path = "connectors" }
codex-connectors-extension = { path = "ext/connectors" }
codex-context-fragments = { path = "context-fragments" }
codex-core = { path = "core" }
codex-core-api = { path = "core-api" }
@@ -124,7 +124,7 @@ impl McpRequestProcessor {
let (mcp_config, runtime_context) = match thread_id.as_deref() {
Some(thread_id) => {
let (_, thread) = self.load_thread(thread_id).await?;
let runtime = thread.current_mcp_runtime();
let runtime = thread.current_mcp_runtime().await;
(runtime.config().clone(), runtime.runtime_context().clone())
}
None => {
@@ -246,14 +246,21 @@ impl McpRequestProcessor {
let mcp_manager = self.thread_manager.mcp_manager();
let codex_apps_tools_cache = mcp_manager.codex_apps_tools_cache();
let auth = self.auth_manager.auth().await;
let mcp_config = match thread {
Some(thread) => thread.runtime_mcp_config(&config).await,
None => mcp_manager.runtime_config(&config).await,
let (mcp_config, runtime_context) = match thread {
Some(thread) => {
let mcp_config = thread.runtime_mcp_config(&config).await;
let runtime = thread.current_mcp_runtime().await;
(mcp_config, runtime.runtime_context().clone())
}
None => {
let mcp_config = mcp_manager.runtime_config(&config).await;
let runtime_context = McpRuntimeContext::new(
self.thread_manager.environment_manager(),
config.cwd.to_path_buf(),
);
(mcp_config, runtime_context)
}
};
let runtime_context = McpRuntimeContext::new(
self.thread_manager.environment_manager(),
config.cwd.to_path_buf(),
);
tokio::spawn(async move {
Self::list_mcp_server_status_task(
@@ -1155,7 +1155,6 @@ impl ThreadRequestProcessor {
let mut thread_extension_init = ExtensionDataInit::new();
if !selected_capability_roots.is_empty() {
thread_extension_init.insert(selected_capability_roots);
codex_mcp_extension::initialize_executor_plugin_thread_data(&mut thread_extension_init);
}
let create_thread_started_at = std::time::Instant::now();
let NewThread {
@@ -191,7 +191,6 @@ HTTP_PROXY = {http_proxy}
)
.await?;
std::fs::write(plugin.path().join(".mcp.json"), r#"{"mcpServers":{}}"#)?;
let config_path = codex_home.path().join("config.toml");
let mut config = std::fs::read_to_string(&config_path)?;
config.push_str(&format!(
+14 -6
View File
@@ -595,8 +595,14 @@ impl CodexThread {
}
/// Returns the exact MCP config, environment bindings, and manager most recently published.
pub fn current_mcp_runtime(&self) -> Arc<crate::session::McpRuntimeSnapshot> {
self.codex.session.services.latest_mcp_runtime()
pub async fn current_mcp_runtime(&self) -> Arc<crate::session::McpRuntimeSnapshot> {
let turn_context = self.codex.session.new_default_turn().await;
self.codex
.session
.capture_step_context(turn_context)
.await
.mcp
.clone()
}
pub fn multi_agent_version(&self) -> Option<MultiAgentVersion> {
@@ -620,8 +626,9 @@ impl CodexThread {
uri: &str,
) -> anyhow::Result<serde_json::Value> {
let result = self
.codex
.session
.current_mcp_runtime()
.await
.manager_arc()
.read_resource(server, ReadResourceRequestParams::new(uri))
.await?;
@@ -635,8 +642,9 @@ impl CodexThread {
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> anyhow::Result<CallToolResult> {
self.codex
.session
self.current_mcp_runtime()
.await
.manager_arc()
.call_tool(server, tool, arguments, meta)
.await
}
+35 -10
View File
@@ -3,7 +3,10 @@ use std::sync::Arc;
use crate::config::Config;
use codex_config::McpServerConfig;
use codex_connectors::ConnectorSnapshot;
use codex_connectors::PluginConnectorSource;
use codex_core_plugins::PluginsManager;
use codex_extension_api::ExtensionData;
use codex_extension_api::ExtensionDataInit;
use codex_extension_api::ExtensionRegistry;
use codex_extension_api::McpServerContribution;
@@ -18,6 +21,7 @@ use codex_mcp::McpServerRegistration;
use codex_mcp::codex_apps_mcp_server_config;
use codex_mcp::configured_mcp_servers;
use codex_mcp::effective_mcp_servers;
use codex_plugin::AppConnectorId;
const LEGACY_CODEX_APPS_REGISTRATION_ID: &str = "legacy_codex_apps";
@@ -69,28 +73,32 @@ impl McpManager {
/// Returns the MCP config after applying compatibility built-ins and
/// runtime-only extension overlays.
pub async fn runtime_config(&self, config: &Config) -> McpConfig {
self.runtime_config_with_context(config, /*thread_init*/ None)
self.runtime_config_with_context(McpServerContributionContext::global(config))
.await
}
pub(crate) async fn runtime_config_for_thread(
pub(crate) async fn runtime_config_for_step(
&self,
config: &Config,
thread_init: &ExtensionDataInit,
thread_store: &ExtensionData,
available_environment_ids: &[String],
) -> McpConfig {
self.runtime_config_with_context(config, Some(thread_init))
.await
self.runtime_config_with_context(McpServerContributionContext::for_step(
config,
thread_init,
thread_store,
available_environment_ids,
))
.await
}
async fn runtime_config_with_context(
&self,
config: &Config,
thread_init: Option<&ExtensionDataInit>,
context: McpServerContributionContext<'_, Config>,
) -> McpConfig {
let context = match thread_init {
Some(thread_init) => McpServerContributionContext::for_thread(config, thread_init),
None => McpServerContributionContext::global(config),
};
let config = context.config();
let mut selected_plugin_connector_sources = Vec::new();
let mut selected_plugin_registrations = Vec::new();
let mut overlays = Vec::new();
// A contributor can emit multiple ordered actions, so order each action globally rather
@@ -121,6 +129,17 @@ impl McpManager {
*config,
),
),
McpServerContribution::SelectedPluginConnectors {
plugin_id,
plugin_display_name,
connector_ids,
} => selected_plugin_connector_sources.push(
PluginConnectorSource::from_connector_ids(
plugin_id,
plugin_display_name,
connector_ids.into_iter().map(AppConnectorId),
),
),
McpServerContribution::Remove { name } => {
overlays.push(OrderedMcpOverlay::Remove {
contributor_id: contributor.id(),
@@ -186,6 +205,12 @@ impl McpManager {
);
}
mcp_config.mcp_server_catalog = catalog;
mcp_config.connector_snapshot =
mcp_config
.connector_snapshot
.merged_with(&ConnectorSnapshot::from_plugin_sources(
selected_plugin_connector_sources,
));
mcp_config
}
+2 -2
View File
@@ -64,7 +64,7 @@ pub async fn build_prompt_input(
);
let thread = thread_manager.start_thread(config).await?;
let output = build_prompt_input_from_session(thread.thread.codex.session.as_ref(), input).await;
let output = build_prompt_input_from_session(&thread.thread.codex.session, input).await;
let shutdown = thread.thread.shutdown_and_wait().await;
let _removed = thread_manager.remove_thread(&thread.thread_id).await;
@@ -73,7 +73,7 @@ pub async fn build_prompt_input(
}
pub(crate) async fn build_prompt_input_from_session(
sess: &Session,
sess: &Arc<Session>,
input: Vec<UserInput>,
) -> CodexResult<Vec<ResponseItem>> {
let turn_context = sess.new_default_turn().await;
+153 -43
View File
@@ -1,7 +1,9 @@
use super::*;
use codex_exec_server::ResolvedSelectedCapabilityRoot;
use codex_mcp::ElicitationReviewRequest;
use codex_mcp::ElicitationReviewer;
use codex_mcp::ElicitationReviewerHandle;
use codex_protocol::capabilities::CapabilityRootLocation;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::mcp_approval_meta::APPROVAL_KIND_KEY as MCP_ELICITATION_APPROVAL_KIND_KEY;
use codex_protocol::mcp_approval_meta::APPROVAL_KIND_MCP_TOOL_CALL as MCP_ELICITATION_APPROVAL_KIND_MCP_TOOL_CALL;
@@ -75,9 +77,20 @@ impl ElicitationReviewer for GuardianMcpElicitationReviewer {
impl Session {
pub(crate) async fn runtime_mcp_config(&self, config: &Config) -> McpConfig {
let environments = self.services.turn_environments.snapshot().await;
let selected_capability_roots = self
.resolve_selected_capability_roots_for_step(&environments)
.await;
let available_environment_ids =
Self::available_selected_environment_ids(&selected_capability_roots);
self.services
.mcp_manager
.runtime_config_for_thread(config, &self.services.mcp_thread_init)
.runtime_config_for_step(
config,
&self.services.mcp_thread_init,
&self.services.thread_extension_data,
&available_environment_ids,
)
.await
}
@@ -88,6 +101,62 @@ impl Session {
codex_mcp::configured_mcp_servers(&self.runtime_mcp_config(config).await)
}
#[expect(
clippy::await_holding_invalid_type,
reason = "MCP runtime comparison and publication must remain serialized"
)]
pub(crate) async fn mcp_runtime_for_step(
self: &Arc<Self>,
turn_context: &TurnContext,
environments: &TurnEnvironmentSnapshot,
selected_capability_roots: &[ResolvedSelectedCapabilityRoot],
) -> Arc<McpRuntimeSnapshot> {
let available_environment_ids =
Self::available_selected_environment_ids(selected_capability_roots);
let current = self.services.latest_mcp_runtime();
if current.available_environment_ids() == available_environment_ids {
return current;
}
let _guard = self.services.mcp_projection_lock.lock().await;
let current = self.services.latest_mcp_runtime();
if current.available_environment_ids() == available_environment_ids {
return current;
}
let mcp_config = self
.services
.mcp_manager
.runtime_config_for_step(
&turn_context.config,
&self.services.mcp_thread_init,
&self.services.thread_extension_data,
&available_environment_ids,
)
.await;
self.refresh_mcp_servers_inner(
turn_context,
mcp_config,
environments,
&available_environment_ids,
Some(self.mcp_elicitation_reviewer()),
)
.await
}
pub(crate) async fn resolve_selected_capability_roots_for_step(
&self,
environments: &TurnEnvironmentSnapshot,
) -> Vec<ResolvedSelectedCapabilityRoot> {
self.services
.turn_environments
.environment_manager()
.resolve_selected_capability_roots(
&self.services.selected_capability_roots,
&environments.captured_environments(),
)
.await
}
pub(crate) fn mcp_elicitation_reviewer(self: &Arc<Self>) -> ElicitationReviewerHandle {
Arc::new(GuardianMcpElicitationReviewer::new(self))
}
@@ -207,51 +276,22 @@ impl Session {
.await
}
pub async fn read_resource(
&self,
server: &str,
params: ReadResourceRequestParams,
) -> anyhow::Result<ReadResourceResult> {
self.services
.latest_mcp_runtime()
.manager_arc()
.read_resource(server, params)
.await
}
pub async fn call_tool(
&self,
server: &str,
tool: &str,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> anyhow::Result<CallToolResult> {
self.services
.latest_mcp_runtime()
.manager_arc()
.call_tool(server, tool, arguments, meta)
.await
}
async fn refresh_mcp_servers_inner(
&self,
turn_context: &TurnContext,
mut mcp_config: McpConfig,
configured_mcp_servers: HashMap<String, McpServerConfig>,
mcp_config: McpConfig,
environments: &TurnEnvironmentSnapshot,
available_environment_ids: &[String],
elicitation_reviewer: Option<ElicitationReviewerHandle>,
) {
mcp_config.mcp_server_catalog = mcp_config
.mcp_server_catalog
.with_materialized_servers(configured_mcp_servers);
let mcp_config = Arc::new(mcp_config);
) -> Arc<McpRuntimeSnapshot> {
let auth = self.services.auth_manager.auth().await;
let mcp_config = Arc::new(mcp_config);
let tool_plugin_provenance = codex_mcp::tool_plugin_provenance(&mcp_config);
let mcp_servers = effective_mcp_servers(&mcp_config, auth.as_ref());
let environment_manager = self.services.turn_environments.environment_manager();
// TODO(anp): Migrate MCP runtime cwd plumbing to PathUri so foreign environment cwd
// values can be used without falling back to the legacy host cwd.
let cwd = turn_context
.environments
let cwd = environments
.primary()
.and_then(|turn_environment| turn_environment.cwd().to_abs_path().ok())
.map(|cwd| cwd.to_path_buf())
@@ -305,10 +345,18 @@ impl Session {
refreshed_manager
.set_elicitations_auto_deny(current_manager.manager().elicitations_auto_deny());
}
self.services
.publish_mcp_runtime(mcp_config, mcp_runtime_context, refreshed_manager);
self.services.publish_mcp_runtime(
mcp_config,
mcp_runtime_context,
available_environment_ids.to_vec(),
refreshed_manager,
)
}
#[expect(
clippy::await_holding_invalid_type,
reason = "MCP runtime refresh and publication must remain serialized"
)]
pub(crate) async fn refresh_mcp_servers_if_requested(
&self,
turn_context: &TurnContext,
@@ -365,9 +413,33 @@ impl Session {
return;
}
let mcp_config = self.runtime_mcp_config(&refresh_config).await;
self.refresh_mcp_servers_inner(turn_context, mcp_config, mcp_servers, elicitation_reviewer)
let _guard = self.services.mcp_projection_lock.lock().await;
let available_environment_ids = self
.services
.latest_mcp_runtime()
.available_environment_ids()
.to_vec();
let mut mcp_config = self
.services
.mcp_manager
.runtime_config_for_step(
&refresh_config,
&self.services.mcp_thread_init,
&self.services.thread_extension_data,
&available_environment_ids,
)
.await;
mcp_config.mcp_server_catalog = mcp_config
.mcp_server_catalog
.with_materialized_servers(mcp_servers);
self.refresh_mcp_servers_inner(
turn_context,
mcp_config,
&turn_context.environments,
&available_environment_ids,
elicitation_reviewer,
)
.await;
}
pub(crate) async fn set_openai_form_elicitation_support(
@@ -398,16 +470,54 @@ impl Session {
Ok(())
}
#[expect(
clippy::await_holding_invalid_type,
reason = "MCP runtime refresh and publication must remain serialized"
)]
pub(crate) async fn refresh_mcp_servers_now(
&self,
turn_context: &TurnContext,
refresh_config: &Config,
elicitation_reviewer: Option<ElicitationReviewerHandle>,
) {
let mcp_config = self.runtime_mcp_config(refresh_config).await;
let mcp_servers = codex_mcp::configured_mcp_servers(&mcp_config);
self.refresh_mcp_servers_inner(turn_context, mcp_config, mcp_servers, elicitation_reviewer)
let _guard = self.services.mcp_projection_lock.lock().await;
let available_environment_ids = self
.services
.latest_mcp_runtime()
.available_environment_ids()
.to_vec();
let mcp_config = self
.services
.mcp_manager
.runtime_config_for_step(
refresh_config,
&self.services.mcp_thread_init,
&self.services.thread_extension_data,
&available_environment_ids,
)
.await;
self.refresh_mcp_servers_inner(
turn_context,
mcp_config,
&turn_context.environments,
&available_environment_ids,
elicitation_reviewer,
)
.await;
}
fn available_selected_environment_ids(
selected_capability_roots: &[ResolvedSelectedCapabilityRoot],
) -> Vec<String> {
let mut available = Vec::new();
for root in selected_capability_roots {
let CapabilityRootLocation::Environment { environment_id, .. } =
&root.selected_root().location;
if !available.contains(environment_id) {
available.push(environment_id.clone());
}
}
available
}
#[cfg(test)]
+8
View File
@@ -10,6 +10,7 @@ pub struct McpRuntimeSnapshot {
config: Arc<McpConfig>,
manager: Arc<McpConnectionManager>,
runtime_context: McpRuntimeContext,
available_environment_ids: Vec<String>,
}
impl McpRuntimeSnapshot {
@@ -17,11 +18,13 @@ impl McpRuntimeSnapshot {
config: Arc<McpConfig>,
manager: Arc<McpConnectionManager>,
runtime_context: McpRuntimeContext,
available_environment_ids: Vec<String>,
) -> Self {
Self {
config,
manager,
runtime_context,
available_environment_ids,
}
}
@@ -41,6 +44,10 @@ impl McpRuntimeSnapshot {
&self.runtime_context
}
pub(crate) fn available_environment_ids(&self) -> &[String] {
&self.available_environment_ids
}
#[cfg(test)]
pub(crate) fn new_uninitialized_for_test(config: &crate::config::Config) -> Arc<Self> {
use codex_exec_server::EnvironmentManager;
@@ -81,6 +88,7 @@ impl McpRuntimeSnapshot {
Arc::new(mcp_config),
Arc::new(manager),
runtime_context,
Vec::new(),
))
}
}
+8 -11
View File
@@ -100,7 +100,6 @@ use codex_protocol::dynamic_tools::DynamicToolResponse;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::mcp::CallToolResult;
use codex_protocol::models::ActivePermissionProfile;
use codex_protocol::models::AdditionalPermissionProfile;
use codex_protocol::models::BaseInstructions;
@@ -158,8 +157,6 @@ use codex_utils_path_uri::PathUri;
use futures::future::BoxFuture;
use futures::future::Shared;
use futures::prelude::*;
use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use rmcp::model::RequestId;
use serde_json::Value;
use tokio::sync::Mutex;
@@ -2825,7 +2822,7 @@ impl Session {
/// `run_turn` and pass the result down; standalone request or history boundaries may capture
/// their own step.
pub(crate) async fn capture_step_context(
&self,
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
) -> Arc<StepContext> {
let deferred_executor_enabled = turn_context
@@ -2846,15 +2843,15 @@ impl Session {
}
let loaded_agents_md = self.services.agents_md_manager.get_loaded().await;
let selected_capability_roots = self
.services
.turn_environments
.environment_manager()
.resolve_selected_capability_roots(
&self.services.selected_capability_roots,
&environments.captured_environments(),
.resolve_selected_capability_roots_for_step(&environments)
.await;
let mcp = self
.mcp_runtime_for_step(
turn_context.as_ref(),
&environments,
&selected_capability_roots,
)
.await;
let mcp = self.services.latest_mcp_runtime();
Arc::new(StepContext::new(
turn_context,
environments,
+9 -1
View File
@@ -661,6 +661,7 @@ impl Session {
let config_for_mcp = Arc::clone(&config);
let mcp_manager_for_mcp = Arc::clone(&mcp_manager);
let mcp_thread_init_for_startup = &mcp_thread_init;
let thread_extension_data_for_mcp = &thread_extension_data;
let mcp_runtime_cwd = session_configuration
.environment_selections()
.first()
@@ -673,7 +674,12 @@ impl Session {
let auth_and_mcp_fut = async move {
let auth = auth_manager_clone.auth().await;
let mcp_config = mcp_manager_for_mcp
.runtime_config_for_thread(&config_for_mcp, mcp_thread_init_for_startup)
.runtime_config_for_step(
&config_for_mcp,
mcp_thread_init_for_startup,
thread_extension_data_for_mcp,
/*available_environment_ids*/ &[],
)
.await;
let mcp_servers = codex_mcp::effective_mcp_servers(&mcp_config, auth.as_ref());
let tool_plugin_provenance = codex_mcp::tool_plugin_provenance(&mcp_config);
@@ -1039,6 +1045,7 @@ impl Session {
// setup is straightforward enough and performs well.
mcp_connection_manager,
mcp_runtime: arc_swap::ArcSwapOption::empty(),
mcp_projection_lock: Mutex::new(()),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
@@ -1210,6 +1217,7 @@ impl Session {
.install_mcp_connection_manager(
Arc::new(mcp_config),
mcp_runtime_context,
/*available_environment_ids*/ Vec::new(),
mcp_connection_manager,
)
.await?;
+3
View File
@@ -5372,6 +5372,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
let services = SessionServices {
mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from(mcp_runtime.manager_arc())),
mcp_runtime: arc_swap::ArcSwapOption::from(Some(mcp_runtime)),
mcp_projection_lock: Mutex::new(()),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
@@ -7446,6 +7447,7 @@ where
let services = SessionServices {
mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from(mcp_runtime.manager_arc())),
mcp_runtime: arc_swap::ArcSwapOption::from(Some(mcp_runtime)),
mcp_projection_lock: Mutex::new(()),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
@@ -7603,6 +7605,7 @@ pub(crate) async fn make_session_and_context_with_rx() -> (
#[tokio::test]
async fn refresh_mcp_servers_keeps_the_previous_runtime_alive() {
let (session, turn_context) = make_session_and_context().await;
let session = Arc::new(session);
let turn_context = Arc::new(turn_context);
let old_runtime = session.services.latest_mcp_runtime();
let step_context = session
+14 -13
View File
@@ -171,8 +171,13 @@ pub(crate) async fn run_turn(
.record_context_updates_and_set_reference_context_item(first_step_context.as_ref())
.await;
let Some((injection_items, explicitly_enabled_connectors)) =
build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await
let Some((injection_items, explicitly_enabled_connectors)) = build_skills_and_plugins(
&sess,
first_step_context.as_ref(),
&input,
&cancellation_token,
)
.await
else {
return Ok(None);
};
@@ -504,10 +509,11 @@ async fn run_hooks_and_record_inputs(
#[instrument(level = "trace", skip_all)]
async fn build_skills_and_plugins(
sess: &Arc<Session>,
turn_context: &TurnContext,
step_context: &StepContext,
input: &[TurnInput],
cancellation_token: &CancellationToken,
) -> Option<(Vec<ResponseItem>, HashSet<String>)> {
let turn_context = step_context.turn.as_ref();
// Guardian input embeds the parent transcript as untrusted evidence. Do not interpret skill or
// plugin mentions from that generated prompt as requests to inject additional instructions.
if crate::guardian::is_guardian_reviewer_source(&turn_context.session_source) {
@@ -538,17 +544,14 @@ async fn build_skills_and_plugins(
// enabled plugins, then converted into turn-scoped guidance below.
let mentioned_plugins =
collect_explicit_plugin_mentions(&user_input, loaded_plugins.capability_summaries());
let connector_snapshot = codex_connectors::ConnectorSnapshot::from_plugin_capability_summaries(
loaded_plugins.capability_summaries(),
);
let connector_snapshot = step_context.mcp.config().connector_snapshot.clone();
let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() {
// Plugin mentions need raw MCP/app inventory even when app tools
// are normally hidden so we can describe the plugin's currently
// usable capabilities for this turn.
match sess
.services
.mcp_connection_manager
.load_full()
match step_context
.mcp
.manager_arc()
.list_all_tools()
.or_cancel(cancellation_token)
.await
@@ -1189,9 +1192,7 @@ pub(crate) async fn built_tools(
.plugins_for_config(&turn_context.config.plugins_config_input())
.instrument(trace_span!("built_tools.load_plugins"))
.await;
let connector_snapshot = codex_connectors::ConnectorSnapshot::from_plugin_capability_summaries(
loaded_plugins.capability_summaries(),
);
let connector_snapshot = step_context.mcp.config().connector_snapshot.clone();
let apps_enabled = turn_context.apps_enabled();
let accessible_connectors =
+12 -2
View File
@@ -51,6 +51,8 @@ pub(crate) struct SessionServices {
pub(crate) mcp_connection_manager: Arc<ArcSwap<McpConnectionManager>>,
/// The latest atomically published MCP config and manager pair.
pub(crate) mcp_runtime: ArcSwapOption<McpRuntimeSnapshot>,
/// Serializes environment-driven runtime rebuilds.
pub(crate) mcp_projection_lock: Mutex<()>,
pub(crate) mcp_startup_cancellation_token: Mutex<CancellationToken>,
pub(crate) unified_exec_manager: UnifiedExecProcessManager,
#[cfg_attr(not(unix), allow(dead_code))]
@@ -106,9 +108,11 @@ impl SessionServices {
&self,
config: Arc<McpConfig>,
runtime_context: McpRuntimeContext,
available_environment_ids: Vec<String>,
manager: McpConnectionManager,
) -> Result<()> {
let runtime = self.publish_mcp_runtime(config, runtime_context, manager);
let runtime =
self.publish_mcp_runtime(config, runtime_context, available_environment_ids, manager);
runtime.manager().validate_required_servers().await
}
@@ -116,13 +120,19 @@ impl SessionServices {
&self,
config: Arc<McpConfig>,
runtime_context: McpRuntimeContext,
available_environment_ids: Vec<String>,
manager: McpConnectionManager,
) -> Arc<McpRuntimeSnapshot> {
let manager = Arc::new(manager);
// Publish the manager for legacy resource clients first. Once the paired snapshot is
// visible, every model-scoped consumer observes this exact manager.
self.mcp_connection_manager.store(Arc::clone(&manager));
let runtime = Arc::new(McpRuntimeSnapshot::new(config, manager, runtime_context));
let runtime = Arc::new(McpRuntimeSnapshot::new(
config,
manager,
runtime_context,
available_environment_ids,
));
self.mcp_runtime.store(Some(Arc::clone(&runtime)));
runtime
}
+22 -2
View File
@@ -584,8 +584,28 @@ async fn start_thread_seeds_extension_data_for_mcp_and_lifecycle_contributors()
})
.await
.expect("start second thread");
let first_resolved = first_thread.thread.runtime_mcp_config(&config).await;
let second_resolved = second_thread.thread.runtime_mcp_config(&config).await;
let first_session = &first_thread.thread.codex.session;
let first_resolved = first_session
.services
.mcp_manager
.runtime_config_for_step(
&config,
&first_session.services.mcp_thread_init,
&first_session.services.thread_extension_data,
/*available_environment_ids*/ &[],
)
.await;
let second_session = &second_thread.thread.codex.session;
let second_resolved = second_session
.services
.mcp_manager
.runtime_config_for_step(
&config,
&second_session.services.mcp_thread_init,
&second_session.services.thread_extension_data,
/*available_environment_ids*/ &[],
)
.await;
assert_eq!(
*lifecycle_observed
@@ -1,17 +1,22 @@
use codex_config::McpServerConfig;
use crate::ExtensionData;
use crate::ExtensionDataInit;
/// Input supplied while resolving MCP server contributions.
///
/// Thread-scoped implementations can read the immutable host-seeded inputs
/// through [`Self::thread_init`]. Implementations should not retain borrowed
/// context after contribution completes.
/// Thread-scoped implementations can read stable host inputs through [`Self::thread_init`] and
/// keep their cache in [`Self::thread_store`]. Implementations should not retain borrowed context
/// after contribution completes.
pub struct McpServerContributionContext<'a, C> {
/// Host configuration visible during MCP resolution.
config: &'a C,
/// Initial inputs for the active thread, when resolution is thread-scoped.
/// Extension-owned data for the active thread, when resolution is thread-scoped.
thread_store: Option<&'a ExtensionData>,
/// Stable host inputs for the active thread, when resolution is thread-scoped.
thread_init: Option<&'a ExtensionDataInit>,
/// Environment IDs whose selected roots may contribute to this exact step.
available_environment_ids: Option<&'a [String]>,
}
impl<C> Clone for McpServerContributionContext<'_, C> {
@@ -27,15 +32,24 @@ impl<'a, C> McpServerContributionContext<'a, C> {
pub fn global(config: &'a C) -> Self {
Self {
config,
thread_store: None,
thread_init: None,
available_environment_ids: None,
}
}
/// Creates context for one active thread runtime.
pub fn for_thread(config: &'a C, thread_init: &'a ExtensionDataInit) -> Self {
/// Creates context for one model step using only currently available environments.
pub fn for_step(
config: &'a C,
thread_init: &'a ExtensionDataInit,
thread_store: &'a ExtensionData,
available_environment_ids: &'a [String],
) -> Self {
Self {
config,
thread_store: Some(thread_store),
thread_init: Some(thread_init),
available_environment_ids: Some(available_environment_ids),
}
}
@@ -44,10 +58,23 @@ impl<'a, C> McpServerContributionContext<'a, C> {
self.config
}
/// Returns the frozen initial inputs when resolving for a running thread.
/// Returns extension-owned state when resolving for a running thread.
pub fn thread_store(&self) -> Option<&'a ExtensionData> {
self.thread_store
}
/// Returns stable host inputs when resolving for a running thread.
pub fn thread_init(&self) -> Option<&'a ExtensionDataInit> {
self.thread_init
}
/// Returns the exact environment availability projection for a model step.
///
/// `Some` means contributors must omit selected roots whose environment ID is absent from the
/// slice. Global resolution returns `None` because it has no thread environments.
pub fn available_environment_ids(&self) -> Option<&'a [String]> {
self.available_environment_ids
}
}
/// One extension-owned overlay for the runtime MCP server configuration.
@@ -66,6 +93,12 @@ pub enum McpServerContribution {
selection_order: usize,
config: Box<McpServerConfig>,
},
/// Adds connector IDs declared by a plugin selected for this thread.
SelectedPluginConnectors {
plugin_id: String,
plugin_display_name: String,
connector_ids: Vec<String>,
},
/// Removes a named MCP server.
Remove { name: String },
}
+1
View File
@@ -16,6 +16,7 @@ workspace = true
codex-core = { workspace = true }
codex-core-plugins = { workspace = true }
codex-config = { workspace = true }
codex-connectors-extension = { workspace = true }
codex-exec-server = { workspace = true }
codex-extension-api = { workspace = true }
codex-features = { workspace = true }
+108 -46
View File
@@ -1,44 +1,47 @@
use codex_connectors_extension::ExecutorPluginConnectorProvider;
use codex_core::config::Config;
use codex_core_plugins::ExecutorPluginProvider;
use codex_exec_server::EnvironmentManager;
use codex_extension_api::ExtensionDataInit;
use codex_extension_api::ExtensionFuture;
use codex_extension_api::McpServerContribution;
use codex_extension_api::McpServerContributionContext;
use codex_extension_api::McpServerContributor;
use codex_protocol::capabilities::CapabilityRootLocation;
use codex_protocol::capabilities::SelectedCapabilityRoot;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::OnceCell;
use std::sync::Mutex;
use self::provider::ExecutorPluginMcpProvider;
mod provider;
/// Frozen MCP declarations for one selected package.
/// Frozen MCP and connector declarations for one selected package.
///
/// Each server config retains the stable logical environment ID. Reconnection may replace the
/// concrete environment instance without changing that authority.
#[derive(Clone)]
struct SelectedPluginMcpServers {
struct SelectedPluginMetadata {
plugin_id: String,
plugin_display_name: String,
selection_order: usize,
servers: Vec<(String, codex_config::McpServerConfig)>,
connector_ids: Vec<String>,
}
#[derive(Default)]
pub(crate) struct SelectedExecutorPluginMcpState {
snapshot: OnceCell<Vec<SelectedPluginMcpServers>>,
cache: Mutex<Vec<CachedSelectedRoot>>,
}
pub(crate) fn seed_thread_state(thread_init: &mut ExtensionDataInit) {
thread_init.insert(SelectedExecutorPluginMcpState::default());
struct CachedSelectedRoot {
root: SelectedCapabilityRoot,
metadata: Option<SelectedPluginMetadata>,
}
pub(crate) struct SelectedExecutorPluginMcpContributor {
plugin_provider: ExecutorPluginProvider,
mcp_provider: ExecutorPluginMcpProvider,
connector_provider: ExecutorPluginConnectorProvider,
}
impl SelectedExecutorPluginMcpContributor {
@@ -46,46 +49,87 @@ impl SelectedExecutorPluginMcpContributor {
Self {
plugin_provider: ExecutorPluginProvider::new(Arc::clone(&environment_manager)),
mcp_provider: ExecutorPluginMcpProvider,
connector_provider: ExecutorPluginConnectorProvider,
}
}
async fn resolve_snapshot(
/// Returns metadata for one stable selected root.
///
/// Successful resolution, including a root that is not a plugin or declares no capabilities,
/// is cached until the thread state is dropped. Environment availability never invalidates
/// this cache; it only controls whether the cached metadata is projected into a model step.
async fn metadata_for_root(
&self,
selected_roots: &[SelectedCapabilityRoot],
) -> Vec<SelectedPluginMcpServers> {
let mut snapshot = Vec::new();
state: &SelectedExecutorPluginMcpState,
selected_root: &SelectedCapabilityRoot,
) -> Option<SelectedPluginMetadata> {
if let Some(cached) = state
.cache
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.iter()
.find(|cached| cached.root == *selected_root)
{
return cached.metadata.clone();
}
for (selection_order, selected_root) in selected_roots.iter().enumerate() {
let plugin = match self.plugin_provider.resolve_bound(selected_root).await {
Ok(Some(plugin)) => plugin,
Ok(None) => continue,
Err(err) => {
tracing::warn!(
selected_root = selected_root.id,
error = %err,
"failed to resolve selected executor plugin for MCP discovery"
);
continue;
}
};
match self.mcp_provider.load(&plugin).await {
Ok(servers) => snapshot.push(SelectedPluginMcpServers {
plugin_id: plugin.plugin().selected_root_id().to_string(),
plugin_display_name: plugin.plugin().manifest().display_name().to_string(),
selection_order,
servers,
}),
Err(err) => {
let plugin = match self.plugin_provider.resolve_bound(selected_root).await {
Ok(plugin) => plugin,
Err(err) => {
tracing::warn!(
selected_root = selected_root.id,
error = %err,
"failed to resolve selected executor plugin"
);
return None;
}
};
let metadata = match plugin {
Some(plugin) => {
let servers = self.mcp_provider.load(&plugin).await.unwrap_or_else(|err| {
tracing::warn!(
selected_root = selected_root.id,
error = %err,
"failed to load selected executor plugin MCP servers"
);
}
Vec::new()
});
let connector_ids = self
.connector_provider
.load(&plugin)
.await
.unwrap_or_else(|err| {
tracing::warn!(
selected_root = selected_root.id,
error = %err,
"failed to load selected executor plugin connectors"
);
Vec::new()
})
.into_iter()
.map(|declaration| declaration.connector_id.0)
.collect();
Some(SelectedPluginMetadata {
plugin_id: plugin.plugin().selected_root_id().to_string(),
plugin_display_name: plugin.plugin().manifest().display_name().to_string(),
servers,
connector_ids,
})
}
None => None,
};
let mut cache = state
.cache
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(cached) = cache.iter().find(|cached| cached.root == *selected_root) {
return cached.metadata.clone();
}
snapshot
cache.push(CachedSelectedRoot {
root: selected_root.clone(),
metadata: metadata.clone(),
});
metadata
}
}
@@ -102,20 +146,31 @@ impl McpServerContributor<Config> for SelectedExecutorPluginMcpContributor {
let Some(thread_init) = context.thread_init() else {
return Vec::new();
};
let Some(thread_store) = context.thread_store() else {
return Vec::new();
};
let Some(selected_roots) = thread_init.get::<Vec<SelectedCapabilityRoot>>() else {
return Vec::new();
};
let Some(state) = thread_init.get::<SelectedExecutorPluginMcpState>() else {
tracing::warn!("selected executor plugin MCP state was not initialized");
return Vec::new();
};
let snapshot = state
.snapshot
.get_or_init(|| self.resolve_snapshot(selected_roots.as_ref()))
.await;
let state = thread_store.get_or_init(SelectedExecutorPluginMcpState::default);
let mut contributions = Vec::new();
for plugin in snapshot {
for (selection_order, selected_root) in selected_roots.iter().enumerate() {
let CapabilityRootLocation::Environment { environment_id, .. } =
&selected_root.location;
if context
.available_environment_ids()
.is_some_and(|available| {
!available
.iter()
.any(|available| available == environment_id)
})
{
continue;
}
let Some(plugin) = self.metadata_for_root(&state, selected_root).await else {
continue;
};
let mut servers = plugin.servers.iter().cloned().collect::<HashMap<_, _>>();
context
.config()
@@ -127,10 +182,17 @@ impl McpServerContributor<Config> for SelectedExecutorPluginMcpContributor {
name,
plugin_id: plugin.plugin_id.clone(),
plugin_display_name: plugin.plugin_display_name.clone(),
selection_order: plugin.selection_order,
selection_order,
config: Box::new(config),
}
}));
if !plugin.connector_ids.is_empty() {
contributions.push(McpServerContribution::SelectedPluginConnectors {
plugin_id: plugin.plugin_id,
plugin_display_name: plugin.plugin_display_name,
connector_ids: plugin.connector_ids,
});
}
}
contributions
-7
View File
@@ -51,10 +51,3 @@ pub fn install_executor_plugins(
executor_plugin::SelectedExecutorPluginMcpContributor::new(environment_manager),
));
}
/// Seeds the per-thread snapshot used by selected executor plugin MCP discovery.
pub fn initialize_executor_plugin_thread_data(
thread_init: &mut codex_extension_api::ExtensionDataInit,
) {
executor_plugin::seed_thread_state(thread_init);
}
@@ -3,6 +3,7 @@ use codex_core::config::Config;
use codex_core::config::ConfigBuilder;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::LOCAL_ENVIRONMENT_ID;
use codex_extension_api::ExtensionData;
use codex_extension_api::ExtensionDataInit;
use codex_extension_api::ExtensionRegistryBuilder;
use codex_extension_api::McpServerContribution;
@@ -109,12 +110,15 @@ async fn selected_plugin_contributions(
path: PathUri::from_host_native_path(plugin_root)?,
},
}]);
codex_mcp_extension::initialize_executor_plugin_thread_data(&mut thread_init);
let thread_store = ExtensionData::new_with_init("test-thread", thread_init.clone());
let available_environment_ids = vec![LOCAL_ENVIRONMENT_ID.to_string()];
Ok(registry.mcp_server_contributors()[0]
.contribute(McpServerContributionContext::for_thread(
.contribute(McpServerContributionContext::for_step(
config,
&thread_init,
&thread_store,
&available_environment_ids,
))
.await
.into_iter()
@@ -132,7 +136,9 @@ async fn selected_plugin_contributions(
selection_order,
enabled: config.enabled,
},
McpServerContribution::Set { .. } | McpServerContribution::Remove { .. } => {
McpServerContribution::Set { .. }
| McpServerContribution::SelectedPluginConnectors { .. }
| McpServerContribution::Remove { .. } => {
panic!("expected selected plugin contribution")
}
})