feat: Use remote installed plugin cache for skills and MCP (#20096)

- Fetches and caches remote /installed plugin state
- Lets skills/list load skills from remote-installed cached plugins
without requiring a local marketplace entry
- Routes plugin list/startup/install/uninstall changes through async
plugin cache invalidation and MCP refresh
This commit is contained in:
xl-openai
2026-04-29 12:09:49 -07:00
committed by GitHub
Unverified
parent 5cf0adba93
commit 73cd831952
9 changed files with 751 additions and 54 deletions
@@ -723,6 +723,35 @@ impl CodexMessageProcessor {
self.clear_plugin_related_caches();
}
pub(crate) fn effective_plugins_changed_callback(
&self,
config: Config,
) -> Arc<dyn Fn() + Send + Sync> {
let thread_manager = Arc::clone(&self.thread_manager);
Arc::new(move || {
Self::spawn_effective_plugins_changed_task(Arc::clone(&thread_manager), config.clone());
})
}
fn on_effective_plugins_changed(&self, config: Config) {
Self::spawn_effective_plugins_changed_task(Arc::clone(&self.thread_manager), config);
}
fn spawn_effective_plugins_changed_task(thread_manager: Arc<ThreadManager>, config: Config) {
tokio::spawn(async move {
thread_manager.plugins_manager().clear_cache();
thread_manager.skills_manager().clear_cache();
if thread_manager.list_thread_ids().await.is_empty() {
return;
}
if let Err(err) =
Self::queue_mcp_server_refresh_for_config(&thread_manager, &config).await
{
warn!("failed to queue MCP refresh after effective plugins changed: {err:?}");
}
});
}
fn clear_plugin_related_caches(&self) {
self.thread_manager.plugins_manager().clear_cache();
self.thread_manager.skills_manager().clear_cache();
@@ -5372,7 +5401,7 @@ impl CodexMessageProcessor {
async fn mcp_server_refresh(&self, request_id: ConnectionRequestId, _params: Option<()>) {
let result = async {
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
self.queue_mcp_server_refresh_for_config(&config).await?;
Self::queue_mcp_server_refresh_for_config(&self.thread_manager, &config).await?;
Ok::<_, JSONRPCErrorError>(McpServerRefreshResponse {})
}
.await;
@@ -5380,11 +5409,10 @@ impl CodexMessageProcessor {
}
async fn queue_mcp_server_refresh_for_config(
&self,
thread_manager: &Arc<ThreadManager>,
config: &Config,
) -> Result<(), JSONRPCErrorError> {
let configured_servers = self
.thread_manager
let configured_servers = thread_manager
.mcp_manager()
.configured_servers(config)
.await;
@@ -5420,7 +5448,6 @@ impl CodexMessageProcessor {
// Refresh requests are queued per thread; each thread rebuilds MCP connections on its next
// active turn to avoid work for threads that never resume.
let thread_manager = Arc::clone(&self.thread_manager);
thread_manager.refresh_mcp_servers(refresh_config).await;
Ok(())
}
@@ -6269,12 +6296,13 @@ impl CodexMessageProcessor {
continue;
}
};
let effective_skill_roots = plugins_manager
.effective_skill_roots_for_layer_stack(
&config_layer_stack,
config.features.enabled(Feature::Plugins) && workspace_codex_plugins_enabled,
)
.await;
let effective_skill_roots = if workspace_codex_plugins_enabled {
plugins_manager
.effective_skill_roots_for_layer_stack(&config_layer_stack, &config)
.await
} else {
Vec::new()
};
let skills_input = codex_core::skills::SkillsLoadInput::new(
cwd_abs.clone(),
effective_skill_roots,
@@ -37,7 +37,12 @@ impl CodexMessageProcessor {
{
return Ok(empty_response());
}
plugins_manager.maybe_start_non_curated_plugin_cache_refresh(&roots);
plugins_manager.maybe_start_plugin_list_background_tasks_for_config(
&config,
auth.clone(),
&roots,
Some(self.effective_plugins_changed_callback(config.clone())),
);
let config_for_marketplace_listing = config.clone();
let plugins_manager_for_marketplace_listing = plugins_manager.clone();
@@ -362,17 +367,10 @@ impl CodexMessageProcessor {
}
};
self.clear_plugin_related_caches();
self.on_effective_plugins_changed(config.clone());
let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path()).await;
if !plugin_mcp_servers.is_empty() {
if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await {
warn!(
plugin = result.plugin_id.as_key(),
"failed to queue MCP refresh after plugin install: {err:?}"
);
}
self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers)
.await;
}
@@ -464,19 +462,16 @@ impl CodexMessageProcessor {
.await
.map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"))?;
// TODO(remote plugins): remote marketplaces do not yet have a local
// marketplace/read-path sync, so this install path reads MCP/apps directly
// from the just-cached bundle.
self.clear_plugin_related_caches();
self.thread_manager
.plugins_manager()
.maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
&config,
auth.clone(),
Some(self.effective_plugins_changed_callback(config.clone())),
);
let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path()).await;
if !plugin_mcp_servers.is_empty() {
if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await {
warn!(
plugin = result.plugin_id.as_key(),
"failed to queue MCP refresh after remote plugin install: {err:?}"
);
}
self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers)
.await;
}
@@ -591,7 +586,15 @@ impl CodexMessageProcessor {
.uninstall_plugin(plugin_id)
.await
.map_err(Self::plugin_uninstall_error)?;
self.clear_plugin_related_caches();
match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => self.on_effective_plugins_changed(config),
Err(err) => {
warn!(
"failed to reload config after plugin uninstall, clearing plugin-related caches only: {err:?}"
);
self.clear_plugin_related_caches();
}
}
Ok(PluginUninstallResponse {})
}
@@ -675,16 +678,32 @@ impl CodexMessageProcessor {
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
codex_core_plugins::remote::uninstall_remote_plugin(
let uninstall_result = codex_core_plugins::remote::uninstall_remote_plugin(
&remote_plugin_service_config,
auth.as_ref(),
config.codex_home.to_path_buf(),
&plugin_id,
)
.await
.map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "uninstall remote plugin"))?;
.await;
self.clear_plugin_related_caches();
if matches!(
&uninstall_result,
Ok(()) | Err(RemotePluginCatalogError::CacheRemove(_))
) {
let plugins_manager = self.thread_manager.plugins_manager();
if plugins_manager.clear_remote_installed_plugins_cache() {
self.on_effective_plugins_changed(config.clone());
}
plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
&config,
auth.clone(),
Some(self.effective_plugins_changed_callback(config.clone())),
);
}
uninstall_result.map_err(|err| {
remote_plugin_catalog_error_to_jsonrpc(err, "uninstall remote plugin")
})?;
Ok(PluginUninstallResponse {})
}
}
+7 -2
View File
@@ -317,10 +317,15 @@ impl MessageProcessor {
});
if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) {
// Keep plugin startup warmups aligned at app-server startup.
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
let on_effective_plugins_changed =
codex_message_processor.effective_plugins_changed_callback((*config).clone());
thread_manager
.plugins_manager()
.maybe_start_plugin_startup_tasks_for_config(&config, auth_manager.clone());
.maybe_start_plugin_startup_tasks_for_config(
&config,
auth_manager.clone(),
Some(on_effective_plugins_changed),
);
}
let config_api = ConfigApi::new(
config_manager,
@@ -7,6 +7,8 @@ use app_test_support::McpProcess;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::PluginListParams;
use codex_app_server_protocol::PluginListResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SkillsChangedNotification;
use codex_app_server_protocol::SkillsListExtraRootsForCwd;
@@ -24,6 +26,7 @@ use wiremock::ResponseTemplate;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::matchers::query_param;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const WATCHER_TIMEOUT: Duration = Duration::from_secs(20);
@@ -52,6 +55,23 @@ plugins = true
)
}
fn write_remote_plugins_enabled_config_with_base_url(
codex_home: &std::path::Path,
base_url: &str,
) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"chatgpt_base_url = "{base_url}"
[features]
plugins = true
remote_plugin = true
"#,
),
)
}
fn write_plugin_with_skill(
repo_root: &std::path::Path,
plugin_name: &str,
@@ -93,6 +113,26 @@ fn write_plugin_with_skill(
Ok(())
}
fn write_cached_remote_plugin_with_skill(
codex_home: &std::path::Path,
) -> Result<std::path::PathBuf> {
let plugin_root = codex_home.join("plugins/cache/chatgpt-global/linear/local");
std::fs::create_dir_all(plugin_root.join(".codex-plugin"))?;
std::fs::write(
plugin_root.join(".codex-plugin/plugin.json"),
r#"{"name":"linear"}"#,
)?;
let skill_dir = plugin_root.join("skills/triage-issues");
std::fs::create_dir_all(&skill_dir)?;
let skill_path = skill_dir.join("SKILL.md");
std::fs::write(
&skill_path,
"---\nname: triage-issues\ndescription: Triage Linear issues\n---\n\n# Body\n",
)?;
Ok(skill_path)
}
#[tokio::test]
async fn skills_list_includes_skills_from_per_cwd_extra_user_roots() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -131,6 +171,186 @@ async fn skills_list_includes_skills_from_per_cwd_extra_user_roots() -> Result<(
Ok(())
}
#[tokio::test]
async fn skills_list_loads_remote_installed_plugin_skills_from_cache() -> Result<()> {
let codex_home = TempDir::new()?;
let cwd = TempDir::new()?;
let server = MockServer::start().await;
let expected_skill_path =
std::fs::canonicalize(write_cached_remote_plugin_with_skill(codex_home.path())?)?;
write_remote_plugins_enabled_config_with_base_url(
codex_home.path(),
&format!("{}/backend-api/", server.uri()),
)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let global_directory_body = r#"{
"plugins": [
{
"id": "plugins~Plugin_linear",
"name": "linear",
"scope": "GLOBAL",
"installation_policy": "AVAILABLE",
"authentication_policy": "ON_USE",
"release": {
"display_name": "Linear",
"description": "Track work in Linear",
"app_ids": [],
"interface": {},
"skills": []
}
}
],
"pagination": {
"limit": 50,
"next_page_token": null
}
}"#;
let global_installed_body = r#"{
"plugins": [
{
"id": "plugins~Plugin_linear",
"name": "linear",
"scope": "GLOBAL",
"installation_policy": "AVAILABLE",
"authentication_policy": "ON_USE",
"release": {
"display_name": "Linear",
"description": "Track work in Linear",
"app_ids": [],
"interface": {},
"skills": []
},
"enabled": true,
"disabled_skill_names": []
}
],
"pagination": {
"limit": 50,
"next_page_token": null
}
}"#;
let empty_page_body = r#"{
"plugins": [],
"pagination": {
"limit": 50,
"next_page_token": null
}
}"#;
for (scope, body) in [
("GLOBAL", global_directory_body),
("WORKSPACE", empty_page_body),
] {
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/list"))
.and(query_param("scope", scope))
.and(query_param("limit", "200"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(&server)
.await;
}
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let stale_skills_list_request_id = mcp
.send_skills_list_request(SkillsListParams {
cwds: vec![cwd.path().to_path_buf()],
force_reload: true,
per_cwd_extra_user_roots: None,
})
.await?;
let stale_skills_list_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(stale_skills_list_request_id)),
)
.await??;
let SkillsListResponse { data } = to_response(stale_skills_list_response)?;
assert_eq!(data.len(), 1);
assert!(
data[0]
.skills
.iter()
.all(|skill| skill.name != "linear:triage-issues"),
"remote installed plugin cache has not been refreshed yet"
);
for (scope, body) in [
("GLOBAL", global_installed_body),
("WORKSPACE", empty_page_body),
] {
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/installed"))
.and(query_param("scope", scope))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(&server)
.await;
}
let plugin_list_request_id = mcp
.send_plugin_list_request(PluginListParams { cwds: None })
.await?;
let plugin_list_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(plugin_list_request_id)),
)
.await??;
let _: PluginListResponse = to_response(plugin_list_response)?;
let SkillsListResponse { data } = timeout(DEFAULT_TIMEOUT, async {
loop {
let skills_list_request_id = mcp
.send_skills_list_request(SkillsListParams {
cwds: vec![cwd.path().to_path_buf()],
force_reload: false,
per_cwd_extra_user_roots: None,
})
.await?;
let skills_list_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(skills_list_request_id)),
)
.await??;
let response: SkillsListResponse = to_response(skills_list_response)?;
if response.data.iter().any(|entry| {
entry
.skills
.iter()
.any(|skill| skill.name == "linear:triage-issues")
}) {
break Ok::<SkillsListResponse, anyhow::Error>(response);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await??;
assert_eq!(data.len(), 1);
assert_eq!(data[0].errors, Vec::new());
let skill = data[0]
.skills
.iter()
.find(|skill| skill.name == "linear:triage-issues")
.expect("expected skill from cached remote plugin");
assert_eq!(
std::fs::canonicalize(skill.path.as_path())?,
expected_skill_path
);
assert_eq!(skill.enabled, true);
Ok(())
}
#[tokio::test]
async fn skills_list_excludes_plugin_skills_when_workspace_codex_plugins_disabled() -> Result<()> {
let codex_home = TempDir::new()?;
+39 -3
View File
@@ -5,6 +5,7 @@ use crate::manifest::load_plugin_manifest;
use crate::marketplace::MarketplacePluginSource;
use crate::marketplace::list_marketplaces;
use crate::marketplace::load_marketplace;
use crate::remote::RemoteInstalledPlugin;
use crate::store::PluginStore;
use crate::store::plugin_version_for_source;
use codex_config::ConfigLayerStack;
@@ -107,13 +108,14 @@ struct PluginAppConfig {
pub async fn load_plugins_from_layer_stack(
config_layer_stack: &ConfigLayerStack,
extra_plugins: HashMap<String, PluginConfig>,
store: &PluginStore,
restriction_product: Option<Product>,
) -> PluginLoadOutcome<McpServerConfig> {
let skill_config_rules = skill_config_rules_from_stack(config_layer_stack);
let mut configured_plugins: Vec<_> = configured_plugins_from_stack(config_layer_stack)
.into_iter()
.collect();
let mut configured_plugins = configured_plugins_from_stack(config_layer_stack);
configured_plugins.extend(extra_plugins);
let mut configured_plugins: Vec<_> = configured_plugins.into_iter().collect();
configured_plugins.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
let mut plugins = Vec::with_capacity(configured_plugins.len());
@@ -145,6 +147,40 @@ pub async fn load_plugins_from_layer_stack(
PluginLoadOutcome::from_plugins(plugins)
}
pub fn remote_installed_plugins_to_config(
plugins: &[RemoteInstalledPlugin],
store: &PluginStore,
) -> HashMap<String, PluginConfig> {
plugins
.iter()
.filter_map(|plugin| {
let plugin_id =
match PluginId::new(plugin.name.clone(), plugin.marketplace_name.clone()) {
Ok(plugin_id) => plugin_id,
Err(err) => {
warn!(
plugin = %plugin.name,
remote_id = %plugin.id,
error = %err,
"ignoring invalid remote installed plugin name"
);
return None;
}
};
// TODO(remote plugins): download or update missing local bundles during remote
// installed reconciliation. Until then, only publish remote installed state for
// bundles already present in the local plugin cache.
store.active_plugin_root(&plugin_id)?;
Some((
plugin_id.as_key(),
PluginConfig {
enabled: plugin.enabled,
},
))
})
.collect()
}
pub fn refresh_curated_plugin_cache(
codex_home: &Path,
plugin_version: &str,
+57
View File
@@ -39,6 +39,14 @@ pub struct RemoteMarketplace {
pub plugins: Vec<RemotePluginSummary>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct RemoteInstalledPlugin {
pub marketplace_name: String,
pub id: String,
pub name: String,
pub enabled: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub struct RemotePluginSummary {
pub id: String,
@@ -369,6 +377,39 @@ pub async fn fetch_remote_marketplaces(
Ok(marketplaces)
}
pub async fn fetch_remote_installed_plugins(
config: &RemotePluginServiceConfig,
auth: Option<&CodexAuth>,
) -> Result<Vec<RemoteInstalledPlugin>, RemotePluginCatalogError> {
let auth = ensure_chatgpt_auth(auth)?;
let global = async {
let scope = RemotePluginScope::Global;
let installed_plugins = fetch_installed_plugins_for_scope(config, auth, scope).await?;
Ok::<_, RemotePluginCatalogError>((scope, installed_plugins))
};
let workspace = async {
let scope = RemotePluginScope::Workspace;
let installed_plugins = fetch_installed_plugins_for_scope(config, auth, scope).await?;
Ok::<_, RemotePluginCatalogError>((scope, installed_plugins))
};
let (global, workspace) = tokio::try_join!(global, workspace)?;
let mut installed_plugins = [global, workspace]
.into_iter()
.flat_map(|(scope, plugins)| {
plugins
.into_iter()
.map(move |plugin| remote_installed_plugin_to_info(scope, &plugin))
})
.collect::<Vec<_>>();
installed_plugins.sort_by(|left, right| {
left.marketplace_name
.cmp(&right.marketplace_name)
.then_with(|| left.id.cmp(&right.id))
});
Ok(installed_plugins)
}
pub async fn fetch_remote_plugin_detail(
config: &RemotePluginServiceConfig,
auth: Option<&CodexAuth>,
@@ -671,6 +712,22 @@ fn build_remote_plugin_summary(
}
}
fn remote_installed_plugin_to_info(
scope: RemotePluginScope,
installed_plugin: &RemotePluginInstalledItem,
) -> RemoteInstalledPlugin {
let plugin = &installed_plugin.plugin;
// Remote per-skill disabled state (`disabled_skill_names`) is intentionally
// not projected into skills/list yet; local skills.config remains the
// supported source for skill enablement.
RemoteInstalledPlugin {
marketplace_name: scope.marketplace_name().to_string(),
id: plugin.id.clone(),
name: plugin.name.clone(),
enabled: installed_plugin.enabled,
}
}
fn remote_plugin_interface_to_info(plugin: &RemotePluginDirectoryItem) -> Option<PluginInterface> {
let interface = &plugin.release.interface;
let display_name = non_empty_string(Some(&plugin.release.display_name));
+282 -9
View File
@@ -22,6 +22,7 @@ use codex_core_plugins::loader::plugin_telemetry_metadata_from_root;
use codex_core_plugins::loader::refresh_curated_plugin_cache;
use codex_core_plugins::loader::refresh_non_curated_plugin_cache;
use codex_core_plugins::loader::refresh_non_curated_plugin_cache_force_reinstall;
use codex_core_plugins::loader::remote_installed_plugins_to_config;
use codex_core_plugins::manifest::PluginManifestInterface;
use codex_core_plugins::manifest::load_plugin_manifest;
use codex_core_plugins::marketplace::MarketplaceError;
@@ -40,6 +41,8 @@ use codex_core_plugins::marketplace_upgrade::ConfiguredMarketplaceUpgradeError;
use codex_core_plugins::marketplace_upgrade::ConfiguredMarketplaceUpgradeOutcome;
use codex_core_plugins::marketplace_upgrade::configured_git_marketplace_names;
use codex_core_plugins::marketplace_upgrade::upgrade_configured_git_marketplaces;
use codex_core_plugins::remote::RemoteInstalledPlugin;
use codex_core_plugins::remote::RemotePluginCatalogError;
use codex_core_plugins::remote::RemotePluginServiceConfig;
use codex_core_plugins::remote_legacy::RemotePluginFetchError;
use codex_core_plugins::remote_legacy::RemotePluginMutationError;
@@ -91,6 +94,30 @@ struct CachedFeaturedPluginIds {
featured_plugin_ids: Vec<String>,
}
struct RemoteInstalledPluginsCacheRefreshRequest {
service_config: RemotePluginServiceConfig,
auth: Option<CodexAuth>,
notify: RemoteInstalledPluginsCacheRefreshNotify,
// App-server attaches side effects such as skills metadata invalidation and MCP refreshes when
// remote installed state changes.
on_effective_plugins_changed: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
}
#[derive(Clone, Copy)]
enum RemoteInstalledPluginsCacheRefreshNotify {
IfCacheChanged,
// Remote mutations may change local bundles or active MCP state even when the installed set is
// unchanged. Notify after `/installed` succeeds so MCP refreshes are ordered after the remote
// installed cache.
AfterSuccessfulRefresh,
}
#[derive(Default)]
struct RemoteInstalledPluginsCacheRefreshState {
requested: Option<RemoteInstalledPluginsCacheRefreshRequest>,
in_flight: bool,
}
#[derive(Clone, PartialEq, Eq)]
struct NonCuratedCacheRefreshRequest {
roots: Vec<AbsolutePathBuf>,
@@ -333,6 +360,10 @@ pub struct PluginsManager {
configured_marketplace_upgrade_state: RwLock<ConfiguredMarketplaceUpgradeState>,
non_curated_cache_refresh_state: RwLock<NonCuratedCacheRefreshState>,
cached_enabled_outcome: RwLock<Option<PluginLoadOutcome>>,
// TODO(remote plugins): reset this cache when ChatGPT auth/account state changes so stale
// remote installed state cannot remain effective for a different account.
remote_installed_plugins_cache: RwLock<Option<Vec<RemoteInstalledPlugin>>>,
remote_installed_plugins_cache_refresh_state: RwLock<RemoteInstalledPluginsCacheRefreshState>,
remote_sync_lock: Semaphore,
restriction_product: Option<Product>,
analytics_events_client: RwLock<Option<AnalyticsEventsClient>>,
@@ -363,6 +394,10 @@ impl PluginsManager {
),
non_curated_cache_refresh_state: RwLock::new(NonCuratedCacheRefreshState::default()),
cached_enabled_outcome: RwLock::new(None),
remote_installed_plugins_cache: RwLock::new(None),
remote_installed_plugins_cache_refresh_state: RwLock::new(
RemoteInstalledPluginsCacheRefreshState::default(),
),
remote_sync_lock: Semaphore::new(/*permits*/ 1),
restriction_product,
analytics_events_client: RwLock::new(None),
@@ -407,6 +442,7 @@ impl PluginsManager {
let outcome = load_plugins_from_layer_stack(
&config.config_layer_stack,
self.remote_installed_plugin_configs(config),
&self.store,
self.restriction_product,
)
@@ -421,15 +457,19 @@ impl PluginsManager {
}
pub fn clear_cache(&self) {
let mut cached_enabled_outcome = match self.cached_enabled_outcome.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
self.clear_enabled_outcome_cache();
let mut featured_plugin_ids_cache = match self.featured_plugin_ids_cache.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
*featured_plugin_ids_cache = None;
}
fn clear_enabled_outcome_cache(&self) {
let mut cached_enabled_outcome = match self.cached_enabled_outcome.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
*cached_enabled_outcome = None;
}
@@ -437,14 +477,19 @@ impl PluginsManager {
pub async fn effective_skill_roots_for_layer_stack(
&self,
config_layer_stack: &ConfigLayerStack,
plugins_feature_enabled: bool,
config: &Config,
) -> Vec<AbsolutePathBuf> {
if !plugins_feature_enabled {
if !config.features.enabled(Feature::Plugins) {
return Vec::new();
}
load_plugins_from_layer_stack(config_layer_stack, &self.store, self.restriction_product)
.await
.effective_skill_roots()
load_plugins_from_layer_stack(
config_layer_stack,
self.remote_installed_plugin_configs(config),
&self.store,
self.restriction_product,
)
.await
.effective_skill_roots()
}
fn cached_enabled_outcome(&self) -> Option<PluginLoadOutcome> {
@@ -454,6 +499,116 @@ impl PluginsManager {
}
}
fn remote_installed_plugin_configs(&self, config: &Config) -> HashMap<String, PluginConfig> {
if !config.features.enabled(Feature::RemotePlugin) {
return HashMap::new();
}
let cache = match self.remote_installed_plugins_cache.read() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
let Some(plugins) = cache.as_ref() else {
return HashMap::new();
};
remote_installed_plugins_to_config(plugins, &self.store)
}
fn write_remote_installed_plugins_cache(&self, plugins: Vec<RemoteInstalledPlugin>) -> bool {
let mut cache = match self.remote_installed_plugins_cache.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
if cache.as_ref().is_some_and(|cache| cache.eq(&plugins)) {
return false;
}
*cache = Some(plugins);
drop(cache);
self.clear_enabled_outcome_cache();
true
}
pub fn clear_remote_installed_plugins_cache(&self) -> bool {
let mut cache = match self.remote_installed_plugins_cache.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
if cache.is_none() {
return false;
}
*cache = None;
drop(cache);
self.clear_enabled_outcome_cache();
true
}
pub fn maybe_start_remote_installed_plugins_cache_refresh(
self: &Arc<Self>,
config: &Config,
auth: Option<CodexAuth>,
on_effective_plugins_changed: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
) {
self.maybe_start_remote_installed_plugins_cache_refresh_with_notify(
config,
auth,
RemoteInstalledPluginsCacheRefreshNotify::IfCacheChanged,
on_effective_plugins_changed,
);
}
pub fn maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
self: &Arc<Self>,
config: &Config,
auth: Option<CodexAuth>,
on_effective_plugins_changed: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
) {
self.maybe_start_remote_installed_plugins_cache_refresh_with_notify(
config,
auth,
RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh,
on_effective_plugins_changed,
);
}
fn maybe_start_remote_installed_plugins_cache_refresh_with_notify(
self: &Arc<Self>,
config: &Config,
auth: Option<CodexAuth>,
notify: RemoteInstalledPluginsCacheRefreshNotify,
on_effective_plugins_changed: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
) {
if !config.features.enabled(Feature::Plugins)
|| !config.features.enabled(Feature::RemotePlugin)
{
return;
}
self.schedule_remote_installed_plugins_cache_refresh(
RemoteInstalledPluginsCacheRefreshRequest {
service_config: remote_plugin_service_config(config),
auth,
notify,
on_effective_plugins_changed,
},
);
}
pub fn maybe_start_plugin_list_background_tasks_for_config(
self: &Arc<Self>,
config: &Config,
auth: Option<CodexAuth>,
roots: &[AbsolutePathBuf],
on_effective_plugins_changed: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
) {
self.maybe_start_non_curated_plugin_cache_refresh(roots);
self.maybe_start_remote_installed_plugins_cache_refresh(
config,
auth,
on_effective_plugins_changed,
);
}
fn cached_featured_plugin_ids(
&self,
cache_key: &FeaturedPluginIdsCacheKey,
@@ -1128,6 +1283,7 @@ impl PluginsManager {
self: &Arc<Self>,
config: &Config,
auth_manager: Arc<AuthManager>,
on_effective_plugins_changed: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
) {
if config.features.enabled(Feature::Plugins) {
self.start_curated_repo_sync();
@@ -1189,6 +1345,21 @@ impl PluginsManager {
auth_manager.clone(),
);
if config.features.enabled(Feature::RemotePlugin) {
let config = config.clone();
let manager = Arc::clone(self);
let auth_manager = auth_manager.clone();
let on_effective_plugins_changed = on_effective_plugins_changed.clone();
tokio::spawn(async move {
let auth = auth_manager.auth().await;
manager.maybe_start_remote_installed_plugins_cache_refresh(
&config,
auth,
on_effective_plugins_changed,
);
});
}
let config = config.clone();
let manager = Arc::clone(self);
tokio::spawn(async move {
@@ -1262,6 +1433,48 @@ impl PluginsManager {
);
}
fn schedule_remote_installed_plugins_cache_refresh(
self: &Arc<Self>,
mut request: RemoteInstalledPluginsCacheRefreshRequest,
) {
let should_spawn = {
let mut state = match self.remote_installed_plugins_cache_refresh_state.write() {
Ok(state) => state,
Err(err) => err.into_inner(),
};
if let Some(existing_request) = state.requested.as_ref() {
if matches!(
existing_request.notify,
RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh
) {
request.notify =
RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh;
}
if request.on_effective_plugins_changed.is_none() {
request.on_effective_plugins_changed =
existing_request.on_effective_plugins_changed.clone();
}
}
state.requested = Some(request);
if state.in_flight {
false
} else {
state.in_flight = true;
true
}
};
if !should_spawn {
return;
}
let manager = Arc::clone(self);
tokio::spawn(async move {
manager
.run_remote_installed_plugins_cache_refresh_loop()
.await;
});
}
fn schedule_non_curated_plugin_cache_refresh(
self: &Arc<Self>,
roots: &[AbsolutePathBuf],
@@ -1368,6 +1581,66 @@ impl PluginsManager {
}
}
async fn run_remote_installed_plugins_cache_refresh_loop(self: Arc<Self>) {
loop {
let request = {
let mut state = match self.remote_installed_plugins_cache_refresh_state.write() {
Ok(state) => state,
Err(err) => err.into_inner(),
};
match state.requested.take() {
Some(request) => request,
None => {
state.in_flight = false;
return;
}
}
};
let installed_plugins = codex_core_plugins::remote::fetch_remote_installed_plugins(
&request.service_config,
request.auth.as_ref(),
)
.await;
match installed_plugins {
Ok(installed_plugins) => {
// TODO(remote plugins): reconcile missing or stale local bundles before
// publishing remote installed state as effective local plugin config.
let changed = self.write_remote_installed_plugins_cache(installed_plugins);
let should_notify = changed
|| matches!(
request.notify,
RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh
);
if should_notify
&& let Some(on_effective_plugins_changed) =
request.on_effective_plugins_changed
{
on_effective_plugins_changed();
}
}
Err(
RemotePluginCatalogError::AuthRequired
| RemotePluginCatalogError::UnsupportedAuthMode,
) => {
let changed = self.clear_remote_installed_plugins_cache();
if changed
&& let Some(on_effective_plugins_changed) =
request.on_effective_plugins_changed
{
on_effective_plugins_changed();
}
}
Err(err) => {
warn!(
error = %err,
"failed to refresh remote installed plugins cache"
);
}
}
}
}
fn run_non_curated_plugin_cache_refresh_loop(self: Arc<Self>) {
loop {
let request = {
@@ -16,6 +16,7 @@ use codex_config::ConfigRequirementsToml;
use codex_config::McpServerConfig;
use codex_config::types::McpServerTransportConfig;
use codex_core_plugins::installed_marketplaces::marketplace_install_root;
use codex_core_plugins::loader::load_plugins_from_layer_stack;
use codex_core_plugins::loader::refresh_non_curated_plugin_cache;
use codex_core_plugins::loader::refresh_non_curated_plugin_cache_force_reinstall;
use codex_core_plugins::marketplace::MarketplacePluginInstallPolicy;
@@ -246,6 +247,67 @@ async fn load_plugins_loads_default_skills_and_mcp_servers() {
);
}
#[tokio::test]
async fn remote_installed_cache_adds_plugin_skill_roots_without_marketplace_config() {
let codex_home = TempDir::new().unwrap();
let plugin_base = codex_home
.path()
.join("plugins/cache/chatgpt-global/linear");
write_plugin(&plugin_base, "local", "linear");
write_file(
&codex_home.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
remote_plugin = true
"#,
);
let config = load_config(codex_home.path(), codex_home.path()).await;
let manager = PluginsManager::new(codex_home.path().to_path_buf());
manager.write_remote_installed_plugins_cache(vec![
codex_core_plugins::remote::RemoteInstalledPlugin {
marketplace_name: "chatgpt-global".to_string(),
id: "plugins~Plugin_linear".to_string(),
name: "linear".to_string(),
enabled: true,
},
]);
let outcome = manager.plugins_for_config(&config).await;
assert_eq!(
outcome.effective_skill_roots(),
vec![AbsolutePathBuf::try_from(plugin_base.join("local/skills")).unwrap()]
);
assert_eq!(outcome.plugins().len(), 1);
assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global");
}
#[tokio::test]
async fn remote_installed_cache_ignores_plugins_missing_local_cache() {
let codex_home = TempDir::new().unwrap();
write_file(
&codex_home.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
remote_plugin = true
"#,
);
let config = load_config(codex_home.path(), codex_home.path()).await;
let manager = PluginsManager::new(codex_home.path().to_path_buf());
manager.write_remote_installed_plugins_cache(vec![
codex_core_plugins::remote::RemoteInstalledPlugin {
marketplace_name: "chatgpt-global".to_string(),
id: "plugins~Plugin_linear".to_string(),
name: "linear".to_string(),
enabled: true,
},
]);
let outcome = manager.plugins_for_config(&config).await;
assert_eq!(outcome, PluginLoadOutcome::default());
}
#[tokio::test]
async fn load_plugins_resolves_disabled_skill_names_against_loaded_plugin_skills() {
let codex_home = TempDir::new().unwrap();
@@ -3300,6 +3362,7 @@ async fn load_plugins_ignores_project_config_files() {
let outcome = load_plugins_from_layer_stack(
&stack,
std::collections::HashMap::new(),
&PluginStore::new(codex_home.path().to_path_buf()),
Some(Product::Codex),
)
+1 -5
View File
@@ -23,7 +23,6 @@ use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_config::loader::load_config_layers_state;
use codex_exec_server::LOCAL_FS;
use codex_features::Feature;
use codex_utils_absolute_path::AbsolutePathBuf;
use crate::review_prompts::resolve_review_request;
@@ -619,10 +618,7 @@ pub async fn list_skills(sess: &Session, sub_id: String, cwds: Vec<PathBuf>, for
}
};
let effective_skill_roots = plugins_manager
.effective_skill_roots_for_layer_stack(
&config_layer_stack,
config.features.enabled(Feature::Plugins),
)
.effective_skill_roots_for_layer_stack(&config_layer_stack, &config)
.await;
let skills_input = crate::SkillsLoadInput::new(
cwd_abs.clone(),