[codex] Support remote plugin install writes (#18917)

## Summary
- Add a remote plugin install write call that POSTs the selected remote
plugin to the ChatGPT cloud plugin API.
- Align remote install with the latest remote read contract:
`pluginName` carries the backend remote plugin id directly, for example
`plugins~Plugin_linear`, and install no longer synthesizes
`<name>@<marketplace>` ids.
- Validate remote install ids with the same character rules as remote
read, return the same install response shape as local installs, and
include mocked app-server coverage for the write path.

## Validation
- `just fmt`
- `cargo test -p codex-app-server --test all plugin_install`
- `cargo test -p codex-core-plugins`
- `just fix -p codex-app-server`
- `just fix -p codex-core-plugins`
This commit is contained in:
xli-oai
2026-04-23 22:10:15 -07:00
committed by GitHub
Unverified
parent 19badb0be2
commit 33cc135cc3
3 changed files with 449 additions and 75 deletions
@@ -1,4 +1,5 @@
use super::*;
use codex_app_server_protocol::PluginInstallPolicy;
impl CodexMessageProcessor {
pub(super) async fn plugin_list(
@@ -358,17 +359,7 @@ impl CodexMessageProcessor {
let marketplace_path = match (marketplace_path, remote_marketplace_name) {
(Some(marketplace_path), None) => marketplace_path,
(None, Some(remote_marketplace_name)) => {
self.outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"remote plugin install is not supported yet for marketplace {remote_marketplace_name}"
),
data: None,
},
)
self.remote_plugin_install(request_id, remote_marketplace_name, plugin_name)
.await;
return;
}
@@ -426,66 +417,14 @@ impl CodexMessageProcessor {
let plugin_apps = load_plugin_apps(result.installed_path.as_path()).await;
let auth = self.auth_manager.auth().await;
let apps_needing_auth = if plugin_apps.is_empty()
|| !config.features.apps_enabled_for_auth(
let apps_needing_auth = self
.plugin_apps_needing_auth_for_install(
&config,
auth.as_ref().is_some_and(CodexAuth::is_chatgpt_auth),
) {
Vec::new()
} else {
let environment_manager = self.thread_manager.environment_manager();
let (all_connectors_result, accessible_connectors_result) = tokio::join!(
connectors::list_all_connectors_with_options(&config, /*force_refetch*/ true),
connectors::list_accessible_connectors_from_mcp_tools_with_environment_manager(
&config, /*force_refetch*/ true, &environment_manager
),
);
let all_connectors = match all_connectors_result {
Ok(connectors) => connectors,
Err(err) => {
warn!(
plugin = result.plugin_id.as_key(),
"failed to load app metadata after plugin install: {err:#}"
);
connectors::list_cached_all_connectors(&config)
.await
.unwrap_or_default()
}
};
let all_connectors =
connectors::connectors_for_plugin_apps(all_connectors, &plugin_apps);
let (accessible_connectors, codex_apps_ready) =
match accessible_connectors_result {
Ok(status) => (status.connectors, status.codex_apps_ready),
Err(err) => {
warn!(
plugin = result.plugin_id.as_key(),
"failed to load accessible apps after plugin install: {err:#}"
);
(
connectors::list_cached_accessible_connectors_from_mcp_tools(
&config,
)
.await
.unwrap_or_default(),
false,
)
}
};
if !codex_apps_ready {
warn!(
plugin = result.plugin_id.as_key(),
"codex_apps MCP not ready after plugin install; skipping appsNeedingAuth check"
);
}
plugin_app_helpers::plugin_apps_needing_auth(
&all_connectors,
&accessible_connectors,
&result.plugin_id.as_key(),
&plugin_apps,
codex_apps_ready,
)
};
.await;
self.outgoing
.send_response(
@@ -542,6 +481,193 @@ impl CodexMessageProcessor {
}
}
async fn remote_plugin_install(
&self,
request_id: ConnectionRequestId,
remote_marketplace_name: String,
plugin_name: String,
) {
let config = match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => config,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
};
if !config.features.enabled(Feature::Plugins)
|| !config.features.enabled(Feature::RemotePlugin)
{
self.outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"remote plugin install is not enabled for marketplace {remote_marketplace_name}"
),
data: None,
},
)
.await;
return;
}
if plugin_name.is_empty()
|| !plugin_name
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '~')
{
self.send_invalid_request_error(
request_id,
"invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed"
.to_string(),
)
.await;
return;
}
let auth = self.auth_manager.auth().await;
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
let remote_detail = match codex_core_plugins::remote::fetch_remote_plugin_detail(
&remote_plugin_service_config,
auth.as_ref(),
&remote_marketplace_name,
&plugin_name,
)
.await
{
Ok(remote_detail) => remote_detail,
Err(err) => {
self.outgoing
.send_error(
request_id,
remote_plugin_catalog_error_to_jsonrpc(
err,
"read remote plugin details before install",
),
)
.await;
return;
}
};
if remote_detail.summary.install_policy == PluginInstallPolicy::NotAvailable {
self.send_invalid_request_error(
request_id,
format!("remote plugin {plugin_name} is not available for install"),
)
.await;
return;
}
if let Err(err) = codex_core_plugins::remote::install_remote_plugin(
&remote_plugin_service_config,
auth.as_ref(),
&remote_marketplace_name,
&plugin_name,
)
.await
{
self.outgoing
.send_error(
request_id,
remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"),
)
.await;
return;
}
self.clear_plugin_related_caches();
let plugin_apps = remote_detail
.app_ids
.into_iter()
.map(codex_core::plugins::AppConnectorId)
.collect::<Vec<_>>();
let apps_needing_auth = self
.plugin_apps_needing_auth_for_install(
&config,
auth.as_ref().is_some_and(CodexAuth::is_chatgpt_auth),
&plugin_name,
&plugin_apps,
)
.await;
self.outgoing
.send_response(
request_id,
PluginInstallResponse {
auth_policy: remote_detail.summary.auth_policy,
apps_needing_auth,
},
)
.await;
}
async fn plugin_apps_needing_auth_for_install(
&self,
config: &Config,
is_chatgpt_auth: bool,
plugin_id: &str,
plugin_apps: &[codex_core::plugins::AppConnectorId],
) -> Vec<AppSummary> {
if plugin_apps.is_empty() || !config.features.apps_enabled_for_auth(is_chatgpt_auth) {
return Vec::new();
}
let environment_manager = self.thread_manager.environment_manager();
let (all_connectors_result, accessible_connectors_result) = tokio::join!(
connectors::list_all_connectors_with_options(config, /*force_refetch*/ true),
connectors::list_accessible_connectors_from_mcp_tools_with_environment_manager(
config,
/*force_refetch*/ true,
&environment_manager
),
);
let all_connectors = match all_connectors_result {
Ok(connectors) => connectors,
Err(err) => {
warn!(
plugin = plugin_id,
"failed to load app metadata after plugin install: {err:#}"
);
connectors::list_cached_all_connectors(config)
.await
.unwrap_or_default()
}
};
let all_connectors = connectors::connectors_for_plugin_apps(all_connectors, plugin_apps);
let (accessible_connectors, codex_apps_ready) = match accessible_connectors_result {
Ok(status) => (status.connectors, status.codex_apps_ready),
Err(err) => {
warn!(
plugin = plugin_id,
"failed to load accessible apps after plugin install: {err:#}"
);
(
connectors::list_cached_accessible_connectors_from_mcp_tools(config)
.await
.unwrap_or_default(),
false,
)
}
};
if !codex_apps_ready {
warn!(
plugin = plugin_id,
"codex_apps MCP not ready after plugin install; skipping appsNeedingAuth check"
);
}
plugin_app_helpers::plugin_apps_needing_auth(
&all_connectors,
&accessible_connectors,
plugin_apps,
codex_apps_ready,
)
}
pub(super) async fn plugin_uninstall(
&self,
request_id: ConnectionRequestId,
@@ -686,7 +812,9 @@ fn remote_plugin_catalog_error_to_jsonrpc(
RemotePluginCatalogError::AuthToken(_)
| RemotePluginCatalogError::Request { .. }
| RemotePluginCatalogError::UnexpectedStatus { .. }
| RemotePluginCatalogError::Decode { .. } => JSONRPCErrorError {
| RemotePluginCatalogError::Decode { .. }
| RemotePluginCatalogError::UnexpectedPluginId { .. }
| RemotePluginCatalogError::UnexpectedEnabledState { .. } => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("{context}: {err}"),
data: None,
@@ -4,6 +4,7 @@ use std::sync::Mutex as StdMutex;
use std::time::Duration;
use anyhow::Result;
use anyhow::bail;
use app_test_support::ChatGptAuthFixture;
use app_test_support::DEFAULT_CLIENT_NAME;
use app_test_support::McpProcess;
@@ -44,6 +45,13 @@ use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::matchers::query_param;
// Plugin install tests wait on connector discovery after the install response path
// starts, which is noticeably slower on Windows CI.
@@ -137,8 +145,7 @@ async fn plugin_install_rejects_multiple_install_sources() -> Result<()> {
}
#[tokio::test]
async fn plugin_install_rejects_remote_marketplace_until_remote_install_is_supported() -> Result<()>
{
async fn plugin_install_rejects_remote_marketplace_when_remote_plugin_is_disabled() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
@@ -146,8 +153,8 @@ async fn plugin_install_rejects_remote_marketplace_until_remote_install_is_suppo
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: None,
remote_marketplace_name: Some("openai-curated".to_string()),
plugin_name: "sample-plugin".to_string(),
remote_marketplace_name: Some("chatgpt-global".to_string()),
plugin_name: "plugins~Plugin_sample".to_string(),
})
.await?;
@@ -161,9 +168,143 @@ async fn plugin_install_rejects_remote_marketplace_until_remote_install_is_suppo
assert!(
err.error
.message
.contains("remote plugin install is not supported yet")
.contains("remote plugin install is not enabled")
);
assert!(err.error.message.contains("chatgpt-global"));
Ok(())
}
#[tokio::test]
async fn plugin_install_writes_remote_plugin_to_cloud_when_remote_plugin_enabled() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_remote_plugin_catalog_config(
codex_home.path(),
&format!("{}/backend-api/", server.uri()),
)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let detail_body = r#"{
"id": "plugins~Plugin_linear",
"name": "linear",
"scope": "GLOBAL",
"installation_policy": "AVAILABLE",
"authentication_policy": "ON_USE",
"release": {
"display_name": "Linear",
"description": "Track work in Linear",
"app_ids": [],
"interface": {
"short_description": "Plan and track work"
},
"skills": []
}
}"#;
let empty_installed_body = r#"{
"plugins": [],
"pagination": {
"limit": 50,
"next_page_token": null
}
}"#;
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/plugins~Plugin_linear"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(detail_body))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/installed"))
.and(query_param("scope", "GLOBAL"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(empty_installed_body))
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path(
"/backend-api/ps/plugins/plugins~Plugin_linear/install",
))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(r#"{"id":"plugins~Plugin_linear","enabled":true}"#),
)
.mount(&server)
.await;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: None,
remote_marketplace_name: Some("chatgpt-global".to_string()),
plugin_name: "plugins~Plugin_linear".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(
response,
PluginInstallResponse {
auth_policy: PluginAuthPolicy::OnUse,
apps_needing_auth: Vec::new(),
}
);
wait_for_remote_plugin_request_count(
&server,
"POST",
"/ps/plugins/plugins~Plugin_linear/install",
/*expected_count*/ 1,
)
.await?;
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_invalid_remote_plugin_name() -> Result<()> {
let codex_home = TempDir::new()?;
write_remote_plugin_catalog_config(codex_home.path(), "https://example.invalid/backend-api/")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: None,
remote_marketplace_name: Some("chatgpt-global".to_string()),
plugin_name: "linear/../../oops".to_string(),
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("invalid remote plugin id"));
assert!(
err.error
.message
.contains("only ASCII letters, digits, `_`, `-`, and `~` are allowed")
);
assert!(err.error.message.contains("openai-curated"));
Ok(())
}
@@ -773,6 +914,56 @@ fn write_analytics_config(codex_home: &std::path::Path, base_url: &str) -> std::
)
}
fn write_remote_plugin_catalog_config(
codex_home: &std::path::Path,
base_url: &str,
) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"
chatgpt_base_url = "{base_url}"
[features]
plugins = true
remote_plugin = true
"#
),
)
}
async fn wait_for_remote_plugin_request_count(
server: &MockServer,
method_name: &str,
path_suffix: &str,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
bail!("wiremock did not record requests");
};
let request_count = requests
.iter()
.filter(|request| {
request.method == method_name && request.url.path().ends_with(path_suffix)
})
.count();
if request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if request_count > expected_count {
bail!(
"expected exactly {expected_count} {method_name} {path_suffix} requests, got {request_count}"
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
fn write_plugin_marketplace(
repo_root: &std::path::Path,
marketplace_name: &str,
+55
View File
@@ -107,6 +107,20 @@ pub enum RemotePluginCatalogError {
expected_marketplace_name: String,
actual_marketplace_name: String,
},
#[error(
"remote plugin install returned unexpected plugin id: expected `{expected}`, got `{actual}`"
)]
UnexpectedPluginId { expected: String, actual: String },
#[error(
"remote plugin install returned unexpected enabled state for `{plugin_id}`: expected {expected_enabled}, got {actual_enabled}"
)]
UnexpectedEnabledState {
plugin_id: String,
expected_enabled: bool,
actual_enabled: bool,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize)]
@@ -258,6 +272,12 @@ struct RemotePluginInstalledResponse {
pagination: RemotePluginPagination,
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
struct RemotePluginInstallResponse {
id: String,
enabled: bool,
}
pub async fn fetch_remote_marketplaces(
config: &RemotePluginServiceConfig,
auth: Option<&CodexAuth>,
@@ -418,6 +438,41 @@ pub async fn fetch_remote_plugin_detail(
})
}
pub async fn install_remote_plugin(
config: &RemotePluginServiceConfig,
auth: Option<&CodexAuth>,
marketplace_name: &str,
plugin_id: &str,
) -> Result<(), RemotePluginCatalogError> {
let auth = ensure_chatgpt_auth(auth)?;
if RemotePluginScope::from_marketplace_name(marketplace_name).is_none() {
return Err(RemotePluginCatalogError::UnknownMarketplace {
marketplace_name: marketplace_name.to_string(),
});
}
let base_url = config.chatgpt_base_url.trim_end_matches('/');
let url = format!("{base_url}/ps/plugins/{plugin_id}/install");
let client = build_reqwest_client();
let request = authenticated_request(client.post(&url), auth)?;
let response: RemotePluginInstallResponse = send_and_decode(request, &url).await?;
if response.id != plugin_id {
return Err(RemotePluginCatalogError::UnexpectedPluginId {
expected: plugin_id.to_string(),
actual: response.id,
});
}
if !response.enabled {
return Err(RemotePluginCatalogError::UnexpectedEnabledState {
plugin_id: plugin_id.to_string(),
expected_enabled: true,
actual_enabled: response.enabled,
});
}
Ok(())
}
fn build_remote_plugin_summary(
plugin: &RemotePluginDirectoryItem,
installed_plugin: Option<&RemotePluginInstalledItem>,