Files
xl-openai e428a12d22 [codex] Enable remote plugins by default (#30297)
## Summary

- enable the remote plugin feature by default
- promote the remote plugin feature from under development to stable
- preserve the existing `features.remote_plugin` override for explicitly
disabling it
- keep legacy disabled-path coverage explicit in TUI and app-server
tests

## Impact

Remote plugin functionality is enabled by default for configurations
that do not set the feature flag. The existing Codex backend
authentication gate still applies.

## Validation

- `just fmt`
- `just test -p codex-features`
- `just test -p codex-tui
plugins_popup_remote_section_fallback_states_snapshot`
- targeted `codex-app-server` plugin-list and skills-list tests
- `git diff --check`

The full TUI and app-server suites were also exercised locally. All
remote-plugin-related coverage passed; unrelated local
sandbox/test-binary failures remain outside this change.
2026-06-28 11:46:25 -07:00

2550 lines
80 KiB
Rust

use std::borrow::Cow;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use anyhow::Result;
use anyhow::bail;
use app_test_support::ChatGptAuthFixture;
use app_test_support::DEFAULT_CLIENT_NAME;
use app_test_support::TestAppServer;
use app_test_support::start_analytics_events_server;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use axum::Json;
use axum::Router;
use axum::extract::State;
use axum::http::HeaderMap;
use axum::http::StatusCode;
use axum::http::Uri;
use axum::http::header::AUTHORIZATION;
use axum::routing::get;
use codex_app_server_protocol::AppInfo;
use codex_app_server_protocol::AppSummary;
use codex_app_server_protocol::AppsListParams;
use codex_app_server_protocol::AppsListResponse;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::PluginAuthPolicy;
use codex_app_server_protocol::PluginAvailability;
use codex_app_server_protocol::PluginInstallParams;
use codex_app_server_protocol::PluginInstallResponse;
use codex_app_server_protocol::RequestId;
use codex_config::types::AuthCredentialsStoreMode;
use codex_utils_absolute_path::AbsolutePathBuf;
use flate2::Compression;
use flate2::write::GzEncoder;
use pretty_assertions::assert_eq;
use rmcp::handler::server::ServerHandler;
use rmcp::model::JsonObject;
use rmcp::model::ListToolsResult;
use rmcp::model::Meta;
use rmcp::model::ServerCapabilities;
use rmcp::model::ServerInfo;
use rmcp::model::Tool;
use rmcp::model::ToolAnnotations;
use rmcp::transport::StreamableHttpServerConfig;
use rmcp::transport::StreamableHttpService;
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use serde_json::json;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use wiremock::Match;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
use wiremock::ResponseTemplate;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::matchers::query_param;
// Plugin install tests wait on connector discovery after the install response path
// starts, which is noticeably slower on Windows CI.
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
const REMOTE_PLUGIN_ID: &str = "plugins~Plugin_00000000000000000000000000000000";
const TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS: &str =
"CODEX_TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS";
#[tokio::test]
async fn plugin_install_rejects_relative_marketplace_paths() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_raw_request(
"plugin/install",
Some(serde_json::json!({
"marketplacePath": "relative-marketplace.json",
"pluginName": "missing-plugin",
})),
)
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("Invalid request"));
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_missing_install_source() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: None,
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(
err.error
.message
.contains("requires exactly one of marketplacePath or remoteMarketplaceName")
);
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_multiple_install_sources() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(AbsolutePathBuf::try_from(
codex_home.path().join("marketplace.json"),
)?),
remote_marketplace_name: Some("openai-curated-remote".to_string()),
plugin_name: "sample-plugin".to_string(),
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(
err.error
.message
.contains("requires exactly one of marketplacePath or remoteMarketplaceName")
);
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_remote_marketplace_when_plugins_are_disabled() -> Result<()> {
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join("config.toml"),
r#"[features]
plugins = false
"#,
)?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: None,
remote_marketplace_name: Some("openai-curated-remote".to_string()),
plugin_name: "plugins~Plugin_22222222222222222222222222222222".to_string(),
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(
err.error
.message
.contains("remote plugin install is not enabled")
);
Ok(())
}
#[tokio::test]
async fn plugin_install_writes_remote_plugin_to_cloud_and_cache() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
let installed_path = codex_home
.path()
.join("plugins/cache/openai-curated-remote/linear/1.2.3");
let remote_app_manifest = json!({
"apps": {
"linear-remote": {
"id": "remote-linear-app"
}
}
});
let bundle_url = mount_remote_plugin_bundle(
&server,
/*status_code*/ 200,
remote_plugin_bundle_tar_gz_bytes_with_contents(
r#"{"name":"linear","version":"0.0.1"}"#,
Some(r#"{"apps":{"linear-bundled":{"id":"bundled-linear-app"}}}"#),
)?,
)
.await;
configure_remote_plugin_test(codex_home.path(), &server)?;
mount_remote_plugin_detail_with_app_manifest(
&server,
REMOTE_PLUGIN_ID,
"1.2.3",
Some(&bundle_url),
remote_app_manifest.clone(),
)
.await;
mount_empty_remote_installed_plugins(&server).await;
mount_remote_plugin_install_after_cache_write(
&server,
REMOTE_PLUGIN_ID,
installed_path.join(".codex-plugin/plugin.json"),
)
.await;
let mut mcp = TestAppServer::new_with_env(
codex_home.path(),
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
)
.await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(
response,
PluginInstallResponse {
auth_policy: PluginAuthPolicy::OnUse,
apps_needing_auth: Vec::new(),
}
);
wait_for_remote_plugin_request_count(
&server,
"POST",
&format!("/ps/plugins/{REMOTE_PLUGIN_ID}/install"),
/*expected_count*/ 1,
)
.await?;
wait_for_remote_plugin_request_count(
&server,
"GET",
"/bundles/linear.tar.gz",
/*expected_count*/ 1,
)
.await?;
assert!(installed_path.join(".codex-plugin/plugin.json").is_file());
let installed_plugin_manifest: serde_json::Value = serde_json::from_str(
&std::fs::read_to_string(installed_path.join(".codex-plugin/plugin.json"))?,
)?;
assert_eq!(installed_plugin_manifest["name"], json!("linear"));
assert_eq!(installed_plugin_manifest["version"], json!("1.2.3"));
let installed_app_manifest: serde_json::Value =
serde_json::from_str(&std::fs::read_to_string(installed_path.join(".app.json"))?)?;
assert_eq!(installed_app_manifest, remote_app_manifest);
assert!(installed_path.join("skills/plan-work/SKILL.md").is_file());
assert!(
!codex_home
.path()
.join(format!(
"plugins/cache/openai-curated-remote/{REMOTE_PLUGIN_ID}/1.2.3"
))
.exists()
);
Ok(())
}
#[tokio::test]
async fn plugin_install_uses_remote_apps_needing_auth_response() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
let remote_app_manifest = json!({
"apps": {
"alpha": {
"id": "alpha",
"category": "Developer Tools"
}
}
});
let bundle_url = mount_remote_plugin_bundle(
&server,
/*status_code*/ 200,
remote_plugin_bundle_tar_gz_bytes("linear")?,
)
.await;
configure_remote_plugin_with_apps_test(codex_home.path(), &server)?;
mount_remote_plugin_detail_with_app_manifest(
&server,
REMOTE_PLUGIN_ID,
"1.2.3",
Some(&bundle_url),
remote_app_manifest,
)
.await;
mount_empty_remote_installed_plugins(&server).await;
mount_remote_plugin_install_with_apps_needing_auth(&server, REMOTE_PLUGIN_ID, &["alpha"]).await;
let mut mcp = TestAppServer::new_with_env(
codex_home.path(),
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
)
.await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(
response,
PluginInstallResponse {
auth_policy: PluginAuthPolicy::OnUse,
apps_needing_auth: vec![AppSummary {
id: "alpha".to_string(),
name: "alpha".to_string(),
description: None,
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
category: Some("Developer Tools".to_string()),
}],
}
);
wait_for_remote_plugin_request_count(
&server,
"GET",
"/backend-api/connectors/directory/list",
/*expected_count*/ 0,
)
.await?;
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_missing_remote_bundle_url() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
configure_remote_plugin_test(codex_home.path(), &server)?;
mount_remote_plugin_detail(
&server,
REMOTE_PLUGIN_ID,
"1.2.3",
/*bundle_download_url*/ None,
)
.await;
mount_empty_remote_installed_plugins(&server).await;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32603);
assert!(
err.error
.message
.contains("backend did not return a download URL")
);
wait_for_remote_plugin_request_count(
&server,
"POST",
&format!("/ps/plugins/{REMOTE_PLUGIN_ID}/install"),
/*expected_count*/ 0,
)
.await?;
assert!(
!codex_home
.path()
.join("plugins/cache/openai-curated-remote/linear")
.exists()
);
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_plain_http_remote_bundle_url() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
let bundle_url = format!("{}/bundles/linear.tar.gz", server.uri());
configure_remote_plugin_test(codex_home.path(), &server)?;
mount_remote_plugin_detail(&server, REMOTE_PLUGIN_ID, "1.2.3", Some(&bundle_url)).await;
mount_empty_remote_installed_plugins(&server).await;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32603);
assert!(
err.error
.message
.contains("unsupported download URL scheme")
);
wait_for_remote_plugin_request_count(
&server,
"POST",
&format!("/ps/plugins/{REMOTE_PLUGIN_ID}/install"),
/*expected_count*/ 0,
)
.await?;
assert!(
!codex_home
.path()
.join("plugins/cache/openai-curated-remote/linear")
.exists()
);
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_invalid_remote_release_version() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
configure_remote_plugin_test(codex_home.path(), &server)?;
mount_remote_plugin_detail(
&server,
REMOTE_PLUGIN_ID,
"../1.2.3",
Some("https://127.0.0.1:1/bundles/linear.tar.gz"),
)
.await;
mount_empty_remote_installed_plugins(&server).await;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32603);
assert!(err.error.message.contains("invalid release version"));
wait_for_remote_plugin_request_count(
&server,
"POST",
&format!("/ps/plugins/{REMOTE_PLUGIN_ID}/install"),
/*expected_count*/ 0,
)
.await?;
assert!(
!codex_home
.path()
.join("plugins/cache/openai-curated-remote/linear")
.exists()
);
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_invalid_remote_plugin_name() -> Result<()> {
let codex_home = TempDir::new()?;
write_remote_plugin_catalog_config(codex_home.path(), "https://example.invalid/backend-api/")?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: None,
remote_marketplace_name: Some("openai-curated-remote".to_string()),
plugin_name: "linear/../../oops".to_string(),
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("invalid remote plugin id"));
Ok(())
}
#[tokio::test]
async fn plugin_install_tracks_analytics_when_remote_detail_fetch_fails() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
configure_remote_plugin_test(codex_home.path(), &server)?;
mount_empty_remote_installed_plugins(&server).await;
mount_backend_analytics_events(&server).await;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("failed with status 404"));
let payload = wait_for_plugin_analytics_payload(&server).await?;
let event_params = &payload["events"][0]["event_params"];
assert_eq!(
payload["events"][0]["event_type"],
"codex_plugin_install_failed"
);
assert_eq!(event_params["plugin_id"], json!(null));
assert_eq!(event_params["remote_plugin_id"], REMOTE_PLUGIN_ID);
assert_eq!(event_params["plugin_name"], json!(null));
assert_eq!(event_params["marketplace_name"], json!(null));
assert_eq!(
event_params["error_type"],
"remote_catalog_unexpected_status"
);
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_remote_plugin_disabled_by_admin_before_download() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
let bundle_url = mount_remote_plugin_bundle(
&server,
/*status_code*/ 200,
remote_plugin_bundle_tar_gz_bytes("linear")?,
)
.await;
configure_remote_plugin_test(codex_home.path(), &server)?;
mount_remote_plugin_detail_with_status(
&server,
REMOTE_PLUGIN_ID,
"1.2.3",
Some(&bundle_url),
PluginAvailability::DisabledByAdmin,
)
.await;
mount_empty_remote_installed_plugins(&server).await;
let mut mcp = TestAppServer::new_with_env(
codex_home.path(),
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
)
.await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("disabled by admin"));
wait_for_remote_plugin_request_count(
&server,
"GET",
"/bundles/linear.tar.gz",
/*expected_count*/ 0,
)
.await?;
wait_for_remote_plugin_request_count(
&server,
"POST",
&format!("/ps/plugins/{REMOTE_PLUGIN_ID}/install"),
/*expected_count*/ 0,
)
.await?;
assert!(
!codex_home
.path()
.join("plugins/cache/openai-curated-remote/linear")
.exists()
);
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_remote_plugin_not_available() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
configure_remote_plugin_test(codex_home.path(), &server)?;
mount_remote_plugin_detail_with_install_policy(
&server,
REMOTE_PLUGIN_ID,
"1.2.3",
/*install_policy*/ "NOT_AVAILABLE",
)
.await;
mount_empty_remote_installed_plugins(&server).await;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("not available for install"));
wait_for_remote_plugin_request_count(
&server,
"POST",
&format!("/ps/plugins/{REMOTE_PLUGIN_ID}/install"),
/*expected_count*/ 0,
)
.await?;
Ok(())
}
#[tokio::test]
async fn plugin_install_rejects_when_workspace_codex_plugins_disabled() -> Result<()> {
let codex_home = TempDir::new()?;
let repo_root = TempDir::new()?;
let server = MockServer::start().await;
write_plugins_enabled_config_with_base_url(
codex_home.path(),
&format!("{}/backend-api/", server.uri()),
)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123")
.plan_type("team"),
AuthCredentialsStoreMode::File,
)?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
/*install_policy*/ None,
/*auth_policy*/ None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
Mock::given(method("GET"))
.and(path("/backend-api/accounts/account-123/settings"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(r#"{"beta_settings":{"enable_plugins":false}}"#),
)
.mount(&server)
.await;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(
err.error
.message
.contains("Codex plugins are disabled for this workspace")
);
Ok(())
}
#[tokio::test]
async fn plugin_install_returns_invalid_request_for_missing_marketplace_file() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(AbsolutePathBuf::try_from(
codex_home.path().join("missing-marketplace.json"),
)?),
remote_marketplace_name: None,
plugin_name: "missing-plugin".to_string(),
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("marketplace file"));
assert!(err.error.message.contains("does not exist"));
Ok(())
}
#[tokio::test]
async fn plugin_install_returns_invalid_request_for_not_available_plugin() -> Result<()> {
let codex_home = TempDir::new()?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
Some("NOT_AVAILABLE"),
/*auth_policy*/ None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("not available for install"));
Ok(())
}
#[tokio::test]
async fn plugin_install_returns_invalid_request_for_disallowed_product_plugin() -> Result<()> {
let codex_home = TempDir::new()?;
let repo_root = TempDir::new()?;
std::fs::create_dir_all(repo_root.path().join(".agents/plugins"))?;
std::fs::write(
repo_root.path().join(".agents/plugins/marketplace.json"),
r#"{
"name": "debug",
"plugins": [
{
"name": "sample-plugin",
"source": {
"source": "local",
"path": "./sample-plugin"
},
"policy": {
"products": ["CHATGPT"]
}
}
]
}"#,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp =
TestAppServer::new_with_args(codex_home.path(), &["--session-source", "atlas"]).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert!(err.error.message.contains("not available for install"));
Ok(())
}
#[tokio::test]
async fn plugin_install_tracks_analytics_event() -> Result<()> {
let analytics_server = start_analytics_events_server().await?;
let codex_home = TempDir::new()?;
write_analytics_config(codex_home.path(), &analytics_server.uri())?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
/*install_policy*/ None,
/*auth_policy*/ None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
let payload = wait_for_plugin_analytics_payload(&analytics_server).await?;
assert_eq!(
payload,
json!({
"events": [{
"event_type": "codex_plugin_installed",
"event_params": {
"plugin_id": "sample-plugin@debug",
"remote_plugin_id": null,
"plugin_name": "sample-plugin",
"marketplace_name": "debug",
"has_skills": false,
"mcp_server_count": 0,
"connector_ids": [],
"product_client_id": DEFAULT_CLIENT_NAME,
}
}]
})
);
Ok(())
}
#[tokio::test]
async fn plugin_install_failure_tracks_analytics_event() -> Result<()> {
let analytics_server = start_analytics_events_server().await?;
let codex_home = TempDir::new()?;
write_analytics_config(codex_home.path(), &analytics_server.uri())?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./missing-plugin",
/*install_policy*/ None,
/*auth_policy*/ None,
)?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
let payload = wait_for_plugin_analytics_payload(&analytics_server).await?;
let event_params = &payload["events"][0]["event_params"];
assert_eq!(
payload["events"][0]["event_type"],
"codex_plugin_install_failed"
);
assert_eq!(event_params["plugin_id"], "sample-plugin@debug");
assert_eq!(event_params["remote_plugin_id"], json!(null));
assert_eq!(event_params["plugin_name"], "sample-plugin");
assert_eq!(event_params["marketplace_name"], "debug");
assert_eq!(event_params["has_skills"], json!(null));
assert_eq!(event_params["mcp_server_count"], json!(null));
assert_eq!(event_params["connector_ids"], json!(null));
assert_eq!(event_params["product_client_id"], DEFAULT_CLIENT_NAME);
assert_eq!(event_params["error_type"], "store_invalid");
Ok(())
}
#[tokio::test]
async fn plugin_install_tracks_remote_plugin_analytics_event() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
let bundle_url = mount_remote_plugin_bundle(
&server,
/*status_code*/ 200,
remote_plugin_bundle_tar_gz_bytes("linear")?,
)
.await;
configure_remote_plugin_test(codex_home.path(), &server)?;
mount_remote_plugin_detail(&server, REMOTE_PLUGIN_ID, "1.2.3", Some(&bundle_url)).await;
mount_empty_remote_installed_plugins(&server).await;
mount_remote_plugin_install(&server, REMOTE_PLUGIN_ID).await;
mount_backend_analytics_events(&server).await;
let mut mcp = TestAppServer::new_with_env(
codex_home.path(),
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
)
.await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
let payload = wait_for_plugin_analytics_payload(&server).await?;
assert_eq!(
payload,
json!({
"events": [{
"event_type": "codex_plugin_installed",
"event_params": {
"plugin_id": "linear@openai-curated-remote",
"remote_plugin_id": REMOTE_PLUGIN_ID,
"plugin_name": "linear",
"marketplace_name": "openai-curated-remote",
"has_skills": true,
"mcp_server_count": 0,
"connector_ids": [],
"product_client_id": DEFAULT_CLIENT_NAME,
}
}]
})
);
Ok(())
}
#[tokio::test]
async fn plugin_install_preserves_status_when_remote_bundle_error_body_is_too_large() -> Result<()>
{
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
let bundle_url =
mount_remote_plugin_bundle(&server, /*status_code*/ 503, vec![b'x'; 8 * 1024 + 1]).await;
configure_remote_plugin_test(codex_home.path(), &server)?;
mount_remote_plugin_detail(&server, REMOTE_PLUGIN_ID, "1.2.3", Some(&bundle_url)).await;
mount_empty_remote_installed_plugins(&server).await;
mount_remote_plugin_install(&server, REMOTE_PLUGIN_ID).await;
mount_backend_analytics_events(&server).await;
let mut mcp = TestAppServer::new_with_env(
codex_home.path(),
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
)
.await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32603);
assert!(err.error.message.contains("failed with status 503"));
assert!(
err.error
.message
.contains("[response body truncated after 8192 bytes]")
);
assert_eq!(
err.error
.message
.bytes()
.filter(|byte| *byte == b'x')
.count(),
8192
);
assert!(!err.error.message.contains("exceeded maximum size"));
wait_for_remote_plugin_request_count(
&server,
"GET",
"/bundles/linear.tar.gz",
/*expected_count*/ 1,
)
.await?;
wait_for_remote_plugin_request_count(
&server,
"POST",
&format!("/ps/plugins/{REMOTE_PLUGIN_ID}/install"),
/*expected_count*/ 0,
)
.await?;
let payload = wait_for_plugin_analytics_payload(&server).await?;
let event_params = &payload["events"][0]["event_params"];
assert_eq!(
payload["events"][0]["event_type"],
"codex_plugin_install_failed"
);
assert_eq!(event_params["plugin_id"], "linear@openai-curated-remote");
assert_eq!(event_params["remote_plugin_id"], REMOTE_PLUGIN_ID);
assert_eq!(event_params["marketplace_name"], "openai-curated-remote");
assert_eq!(event_params["error_type"], "remote_bundle_download_status");
assert!(
!codex_home
.path()
.join("plugins/cache/openai-curated-remote/linear")
.exists()
);
Ok(())
}
#[tokio::test]
async fn plugin_install_returns_apps_needing_auth() -> Result<()> {
let connectors = vec![
AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: Some("https://example.com/alpha.png".to_string()),
logo_url_dark: None,
icon_assets: None,
icon_dark_assets: None,
distribution_channel: Some("featured".to_string()),
branding: None,
app_metadata: None,
labels: None,
install_url: None,
is_accessible: false,
is_enabled: true,
plugin_display_names: Vec::new(),
},
AppInfo {
id: "beta".to_string(),
name: "Beta".to_string(),
description: Some("Beta connector".to_string()),
logo_url: None,
logo_url_dark: None,
icon_assets: None,
icon_dark_assets: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: None,
is_accessible: false,
is_enabled: true,
plugin_display_names: Vec::new(),
},
];
let tools = vec![connector_tool("beta", "Beta App")?];
let (server_url, server_handle, server_control) = start_apps_server(connectors, tools).await?;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &server_url)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
/*install_policy*/ None,
/*auth_policy*/ None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &["alpha", "beta"])?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let directory_requests_before_install = server_control.directory_request_count();
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(
response,
PluginInstallResponse {
auth_policy: PluginAuthPolicy::OnInstall,
apps_needing_auth: vec![AppSummary {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
category: None,
}],
}
);
assert!(server_control.directory_request_count() > directory_requests_before_install);
server_handle.abort();
let _ = server_handle.await;
Ok(())
}
#[tokio::test]
async fn plugin_install_skips_mcp_oauth_for_chatgpt_dual_surface_plugin() -> Result<()> {
let connectors = vec![AppInfo {
id: "sample-mcp".to_string(),
name: "Sample MCP".to_string(),
description: Some("Sample MCP connector".to_string()),
logo_url: Some("https://example.com/alpha.png".to_string()),
logo_url_dark: None,
icon_assets: None,
icon_dark_assets: None,
distribution_channel: Some("featured".to_string()),
branding: None,
app_metadata: None,
labels: None,
install_url: None,
is_accessible: false,
is_enabled: true,
plugin_display_names: Vec::new(),
}];
let (apps_server_url, apps_server_handle, _apps_server_control) =
start_apps_server(connectors, Vec::new()).await?;
let oauth_server = MockServer::start().await;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &apps_server_url)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
/*install_policy*/ None,
/*auth_policy*/ None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &["sample-mcp"])?;
write_plugin_mcp_config(repo_root.path(), "sample-plugin", &oauth_server.uri())?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.auth_policy, PluginAuthPolicy::OnInstall);
assert_eq!(oauth_discovery_request_count(&oauth_server).await, 0);
apps_server_handle.abort();
let _ = apps_server_handle.await;
Ok(())
}
#[tokio::test]
async fn plugin_install_starts_mcp_oauth_with_formerly_disallowed_plugin_app() -> Result<()> {
let (apps_server_url, apps_server_handle, _apps_server_control) =
start_apps_server(Vec::new(), Vec::new()).await?;
let oauth_server = MockServer::start().await;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &apps_server_url)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
/*install_policy*/ None,
/*auth_policy*/ None,
)?;
write_plugin_source(
repo_root.path(),
"sample-plugin",
&["asdk_app_6938a94a61d881918ef32cb999ff937c"],
)?;
write_plugin_mcp_config(repo_root.path(), "sample-plugin", &oauth_server.uri())?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(
response,
PluginInstallResponse {
auth_policy: PluginAuthPolicy::OnInstall,
apps_needing_auth: vec![AppSummary {
id: "asdk_app_6938a94a61d881918ef32cb999ff937c".to_string(),
name: "asdk_app_6938a94a61d881918ef32cb999ff937c".to_string(),
description: None,
install_url: Some(
"https://chatgpt.com/apps/asdk-app-6938a94a61d881918ef32cb999ff937c/asdk_app_6938a94a61d881918ef32cb999ff937c"
.to_string(),
),
category: None,
}],
}
);
assert!(oauth_discovery_request_count(&oauth_server).await > 0);
apps_server_handle.abort();
let _ = apps_server_handle.await;
Ok(())
}
#[tokio::test]
async fn plugin_install_starts_mcp_oauth_through_protected_resource_metadata() -> Result<()> {
let resource_server = MockServer::start().await;
let authorization_server = MockServer::start().await;
let resource_metadata_url = format!("{}/oauth-resource", resource_server.uri());
let challenge = format!("Bearer resource_metadata=\"{resource_metadata_url}\"");
Mock::given(method("GET"))
.and(path("/mcp"))
.respond_with(
ResponseTemplate::new(401).insert_header("WWW-Authenticate", challenge.as_str()),
)
.mount(&resource_server)
.await;
Mock::given(method("GET"))
.and(path("/oauth-resource"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"resource": resource_server.uri(),
"authorization_servers": [authorization_server.uri()],
})))
.mount(&resource_server)
.await;
Mock::given(method("GET"))
.and(path("/.well-known/oauth-authorization-server"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"authorization_endpoint": format!("{}/oauth/authorize", authorization_server.uri()),
"token_endpoint": format!("{}/oauth/token", authorization_server.uri()),
"registration_endpoint": format!("{}/oauth/register", authorization_server.uri()),
"response_types_supported": ["code"],
"code_challenge_methods_supported": ["S256"],
})))
.mount(&authorization_server)
.await;
Mock::given(method("POST"))
.and(path("/oauth/register"))
.respond_with(ResponseTemplate::new(400))
.mount(&authorization_server)
.await;
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join("config.toml"),
"[features]\nplugins = true\n",
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
/*install_policy*/ None,
/*auth_policy*/ None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
write_plugin_mcp_config(repo_root.path(), "sample-plugin", &resource_server.uri())?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let _: PluginInstallResponse = to_response(response)?;
wait_for_remote_plugin_request_count(
&authorization_server,
"POST",
"/oauth/register",
/*expected_count*/ 1,
)
.await?;
let resource_metadata_requested = resource_server
.received_requests()
.await
.unwrap_or_default()
.iter()
.any(|request| request.url.path() == "/oauth-resource");
assert!(resource_metadata_requested);
Ok(())
}
#[tokio::test]
async fn plugin_install_starts_mcp_oauth_for_api_key_dual_surface_plugin() -> Result<()> {
let oauth_server = MockServer::start().await;
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join("config.toml"),
r#"
mcp_oauth_credentials_store = "file"
[features]
plugins = true
connectors = true
"#,
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
/*install_policy*/ None,
/*auth_policy*/ None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &["sample-mcp"])?;
write_plugin_mcp_config(repo_root.path(), "sample-plugin", &oauth_server.uri())?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = TestAppServer::new_with_env(
codex_home.path(),
&[("OPENAI_API_KEY", Some("test-api-key"))],
)
.await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.auth_policy, PluginAuthPolicy::OnInstall);
assert!(oauth_discovery_request_count(&oauth_server).await > 0);
Ok(())
}
#[tokio::test]
async fn plugin_install_starts_remote_mcp_oauth_for_install_response_only_app() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
let oauth_server = MockServer::start().await;
let bundle_url = mount_remote_plugin_bundle(
&server,
/*status_code*/ 200,
remote_plugin_bundle_tar_gz_bytes_with_mcp_config("linear", &oauth_server.uri())?,
)
.await;
configure_remote_plugin_with_apps_test(codex_home.path(), &server)?;
mount_remote_plugin_detail(&server, REMOTE_PLUGIN_ID, "1.2.3", Some(&bundle_url)).await;
mount_empty_remote_installed_plugins(&server).await;
mount_remote_plugin_install_with_apps_needing_auth(&server, REMOTE_PLUGIN_ID, &["alpha"]).await;
let mut mcp = TestAppServer::new_with_env(
codex_home.path(),
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
)
.await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(
response,
PluginInstallResponse {
auth_policy: PluginAuthPolicy::OnUse,
apps_needing_auth: vec![AppSummary {
id: "alpha".to_string(),
name: "alpha".to_string(),
description: None,
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
category: None,
}],
}
);
assert!(oauth_discovery_request_count(&oauth_server).await > 0);
Ok(())
}
#[tokio::test]
async fn plugin_install_skips_remote_mcp_oauth_for_bundled_same_name_app() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
let oauth_server = MockServer::start().await;
let bundle_url = mount_remote_plugin_bundle(
&server,
/*status_code*/ 200,
remote_plugin_bundle_tar_gz_bytes_with_app_and_mcp_config(
"linear",
r#"{"apps":{"sample-mcp":{"id":"alpha"}}}"#,
&oauth_server.uri(),
)?,
)
.await;
configure_remote_plugin_with_apps_test(codex_home.path(), &server)?;
mount_remote_plugin_detail(&server, REMOTE_PLUGIN_ID, "1.2.3", Some(&bundle_url)).await;
mount_empty_remote_installed_plugins(&server).await;
mount_remote_plugin_install_with_apps_needing_auth(&server, REMOTE_PLUGIN_ID, &["alpha"]).await;
let mut mcp = TestAppServer::new_with_env(
codex_home.path(),
&[(TEST_ALLOW_HTTP_REMOTE_PLUGIN_BUNDLE_DOWNLOADS, Some("1"))],
)
.await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = send_remote_plugin_install_request(&mut mcp, REMOTE_PLUGIN_ID).await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(
response,
PluginInstallResponse {
auth_policy: PluginAuthPolicy::OnUse,
apps_needing_auth: vec![AppSummary {
id: "alpha".to_string(),
name: "alpha".to_string(),
description: None,
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
category: None,
}],
}
);
assert_eq!(oauth_discovery_request_count(&oauth_server).await, 0);
Ok(())
}
#[tokio::test]
async fn plugin_install_includes_formerly_disallowed_apps_needing_auth() -> Result<()> {
let connectors = vec![AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: Some("https://example.com/alpha.png".to_string()),
logo_url_dark: None,
icon_assets: None,
icon_dark_assets: None,
distribution_channel: Some("featured".to_string()),
branding: None,
app_metadata: None,
labels: None,
install_url: None,
is_accessible: false,
is_enabled: true,
plugin_display_names: Vec::new(),
}];
let (server_url, server_handle, server_control) =
start_apps_server(connectors, Vec::new()).await?;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &server_url)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
/*install_policy*/ None,
Some("ON_USE"),
)?;
write_plugin_source(
repo_root.path(),
"sample-plugin",
&["alpha", "asdk_app_6938a94a61d881918ef32cb999ff937c"],
)?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let directory_requests_before_install =
warm_app_directory_cache(&mut mcp, &server_control, "Alpha").await?;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(
response,
PluginInstallResponse {
auth_policy: PluginAuthPolicy::OnUse,
apps_needing_auth: vec![AppSummary {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
category: None,
},
AppSummary {
id: "asdk_app_6938a94a61d881918ef32cb999ff937c".to_string(),
name: "asdk_app_6938a94a61d881918ef32cb999ff937c".to_string(),
description: None,
install_url: Some(
"https://chatgpt.com/apps/asdk-app-6938a94a61d881918ef32cb999ff937c/asdk_app_6938a94a61d881918ef32cb999ff937c"
.to_string(),
),
category: None,
}],
}
);
assert_eq!(
server_control.directory_request_count(),
directory_requests_before_install
);
server_handle.abort();
let _ = server_handle.await;
Ok(())
}
#[tokio::test]
async fn plugin_install_makes_bundled_mcp_servers_available_to_followup_requests() -> Result<()> {
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join("config.toml"),
"[features]\nplugins = true\n",
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
/*install_policy*/ None,
/*auth_policy*/ None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
std::fs::write(
repo_root.path().join("sample-plugin/.mcp.json"),
r#"{
"mcpServers": {
"sample-mcp": {
"command": "echo"
}
}
}"#,
)?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path: Some(marketplace_path),
remote_marketplace_name: None,
plugin_name: "sample-plugin".to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
assert!(!config.contains("[mcp_servers.sample-mcp]"));
assert!(!config.contains("command = \"echo\""));
let request_id = mcp
.send_raw_request(
"mcpServer/oauth/login",
Some(json!({
"name": "sample-mcp",
})),
)
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert_eq!(
err.error.message,
"OAuth login is only supported for streamable HTTP servers."
);
Ok(())
}
#[derive(Clone)]
struct AppsServerState {
response: Arc<StdMutex<serde_json::Value>>,
directory_request_count: Arc<AtomicUsize>,
}
#[derive(Clone)]
struct AppsServerControl {
directory_request_count: Arc<AtomicUsize>,
}
impl AppsServerControl {
fn directory_request_count(&self) -> usize {
self.directory_request_count.load(Ordering::SeqCst)
}
}
async fn warm_app_directory_cache(
mcp: &mut TestAppServer,
server_control: &AppsServerControl,
expected_app_name: &str,
) -> Result<usize> {
let app_list_request_id = mcp
.send_apps_list_request(AppsListParams {
force_refetch: true,
..Default::default()
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(app_list_request_id)),
)
.await??;
let response: AppsListResponse = to_response(response)?;
assert!(
response
.data
.iter()
.any(|app| app.name == expected_app_name)
);
let directory_request_count = server_control.directory_request_count();
assert!(directory_request_count > 0);
Ok(directory_request_count)
}
#[derive(Clone)]
struct PluginInstallMcpServer {
tools: Arc<StdMutex<Vec<Tool>>>,
}
impl ServerHandler for PluginInstallMcpServer {
fn get_info(&self) -> ServerInfo {
ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
}
fn list_tools(
&self,
_request: Option<rmcp::model::PaginatedRequestParams>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> impl std::future::Future<Output = Result<ListToolsResult, rmcp::ErrorData>> + Send + '_
{
let tools = self.tools.clone();
async move {
let tools = tools
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone();
Ok(ListToolsResult {
tools,
next_cursor: None,
meta: None,
})
}
}
}
async fn start_apps_server(
connectors: Vec<AppInfo>,
tools: Vec<Tool>,
) -> Result<(String, JoinHandle<()>, AppsServerControl)> {
let directory_request_count = Arc::new(AtomicUsize::new(0));
let state = Arc::new(AppsServerState {
response: Arc::new(StdMutex::new(
json!({ "apps": connectors, "next_token": null }),
)),
directory_request_count: directory_request_count.clone(),
});
let server_control = AppsServerControl {
directory_request_count,
};
let tools = Arc::new(StdMutex::new(tools));
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let mcp_service = StreamableHttpService::new(
{
let tools = tools.clone();
move || {
Ok(PluginInstallMcpServer {
tools: tools.clone(),
})
}
},
Arc::new(LocalSessionManager::default()),
StreamableHttpServerConfig::default(),
);
let router = Router::new()
.route("/connectors/directory/list", get(list_directory_connectors))
.route(
"/connectors/directory/list_workspace",
get(list_directory_connectors),
)
.with_state(state)
.nest_service("/api/codex/ps/mcp", mcp_service);
let handle = tokio::spawn(async move {
let _ = axum::serve(listener, router).await;
});
Ok((format!("http://{addr}"), handle, server_control))
}
async fn list_directory_connectors(
State(state): State<Arc<AppsServerState>>,
headers: HeaderMap,
uri: Uri,
) -> Result<impl axum::response::IntoResponse, StatusCode> {
state.directory_request_count.fetch_add(1, Ordering::SeqCst);
let bearer_ok = headers
.get(AUTHORIZATION)
.and_then(|value| value.to_str().ok())
.is_some_and(|value| value == "Bearer chatgpt-token");
let account_ok = headers
.get("chatgpt-account-id")
.and_then(|value| value.to_str().ok())
.is_some_and(|value| value == "account-123");
let external_logos_ok = uri
.query()
.is_some_and(|query| query.split('&').any(|pair| pair == "external_logos=true"));
if !bearer_ok || !account_ok {
Err(StatusCode::UNAUTHORIZED)
} else if !external_logos_ok {
Err(StatusCode::BAD_REQUEST)
} else {
let response = state
.response
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone();
Ok(Json(response))
}
}
fn connector_tool(connector_id: &str, connector_name: &str) -> Result<Tool> {
let schema: JsonObject = serde_json::from_value(json!({
"type": "object",
"additionalProperties": false
}))?;
let mut tool = Tool::new(
Cow::Owned(format!("connector_{connector_id}")),
Cow::Borrowed("Connector test tool"),
Arc::new(schema),
);
tool.annotations = Some(ToolAnnotations::new().read_only(true));
let mut meta = Meta::new();
meta.0
.insert("connector_id".to_string(), json!(connector_id));
meta.0
.insert("connector_name".to_string(), json!(connector_name));
tool.meta = Some(meta);
Ok(tool)
}
fn write_connectors_config(codex_home: &std::path::Path, base_url: &str) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"
chatgpt_base_url = "{base_url}"
mcp_oauth_credentials_store = "file"
[features]
connectors = true
"#
),
)
}
fn write_plugins_enabled_config_with_base_url(
codex_home: &std::path::Path,
base_url: &str,
) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"chatgpt_base_url = "{base_url}"
[features]
plugins = true
"#,
),
)
}
fn write_analytics_config(codex_home: &std::path::Path, base_url: &str) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!("chatgpt_base_url = \"{base_url}\"\n"),
)
}
async fn mount_backend_analytics_events(server: &MockServer) {
Mock::given(method("POST"))
.and(path("/backend-api/codex/analytics-events/events"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"{"status":"ok"}"#))
.mount(server)
.await;
}
async fn wait_for_plugin_analytics_payload(server: &MockServer) -> Result<serde_json::Value> {
timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if let Some(request) = requests.iter().find(|request| {
request.method == "POST"
&& request
.url
.path()
.ends_with("/codex/analytics-events/events")
}) {
return serde_json::from_slice(&request.body)
.map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}"));
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?
}
async fn oauth_discovery_request_count(server: &MockServer) -> usize {
server
.received_requests()
.await
.unwrap_or_default()
.iter()
.filter(|request| request.url.path().contains("oauth-authorization-server"))
.count()
}
fn write_remote_plugin_catalog_config(
codex_home: &std::path::Path,
base_url: &str,
) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"
chatgpt_base_url = "{base_url}"
[features]
plugins = true
"#
),
)
}
fn configure_remote_plugin_test(codex_home: &std::path::Path, server: &MockServer) -> Result<()> {
write_remote_plugin_catalog_config(codex_home, &format!("{}/backend-api/", server.uri()))?;
write_chatgpt_auth(
codex_home,
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)
}
fn configure_remote_plugin_with_apps_test(
codex_home: &std::path::Path,
server: &MockServer,
) -> Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"
chatgpt_base_url = "{}/backend-api/"
[features]
plugins = true
connectors = true
"#,
server.uri()
),
)?;
write_chatgpt_auth(
codex_home,
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)
}
async fn mount_remote_plugin_bundle(
server: &MockServer,
status_code: u16,
body: Vec<u8>,
) -> String {
Mock::given(method("GET"))
.and(path("/bundles/linear.tar.gz"))
.respond_with(
ResponseTemplate::new(status_code)
.insert_header("content-type", "application/gzip")
.set_body_bytes(body),
)
.mount(server)
.await;
format!("{}/bundles/linear.tar.gz", server.uri())
}
async fn mount_remote_plugin_detail(
server: &MockServer,
remote_plugin_id: &str,
release_version: &str,
bundle_download_url: Option<&str>,
) {
mount_remote_plugin_detail_with_status(
server,
remote_plugin_id,
release_version,
bundle_download_url,
PluginAvailability::Available,
)
.await;
}
async fn mount_remote_plugin_detail_with_app_manifest(
server: &MockServer,
remote_plugin_id: &str,
release_version: &str,
bundle_download_url: Option<&str>,
app_manifest: serde_json::Value,
) {
mount_remote_plugin_detail_with_status_and_app_manifest(
server,
remote_plugin_id,
release_version,
bundle_download_url,
PluginAvailability::Available,
Some(app_manifest),
)
.await;
}
async fn mount_remote_plugin_detail_with_status(
server: &MockServer,
remote_plugin_id: &str,
release_version: &str,
bundle_download_url: Option<&str>,
status: PluginAvailability,
) {
mount_remote_plugin_detail_with_status_and_app_manifest(
server,
remote_plugin_id,
release_version,
bundle_download_url,
status,
/*app_manifest*/ None,
)
.await;
}
async fn mount_remote_plugin_detail_with_status_and_app_manifest(
server: &MockServer,
remote_plugin_id: &str,
release_version: &str,
bundle_download_url: Option<&str>,
status: PluginAvailability,
app_manifest: Option<serde_json::Value>,
) {
mount_remote_plugin_detail_with_options(
server,
remote_plugin_id,
release_version,
bundle_download_url,
status,
"AVAILABLE",
app_manifest,
)
.await;
}
async fn mount_remote_plugin_detail_with_install_policy(
server: &MockServer,
remote_plugin_id: &str,
release_version: &str,
install_policy: &str,
) {
mount_remote_plugin_detail_with_options(
server,
remote_plugin_id,
release_version,
/*bundle_download_url*/ None,
PluginAvailability::Available,
install_policy,
/*app_manifest*/ None,
)
.await;
}
async fn mount_remote_plugin_detail_with_options(
server: &MockServer,
remote_plugin_id: &str,
release_version: &str,
bundle_download_url: Option<&str>,
status: PluginAvailability,
install_policy: &str,
app_manifest: Option<serde_json::Value>,
) {
let status = match status {
PluginAvailability::Available => "ENABLED",
PluginAvailability::DisabledByAdmin => "DISABLED_BY_ADMIN",
};
let bundle_download_url_field = bundle_download_url
.map(|url| format!(r#" "bundle_download_url": "{url}","#))
.unwrap_or_default();
let app_manifest_field = app_manifest
.map(|manifest| format!(r#" "app_manifest": {manifest},"#))
.unwrap_or_default();
let detail_body = format!(
r#"{{
"id": "{remote_plugin_id}",
"name": "linear",
"scope": "GLOBAL",
"installation_policy": "{install_policy}",
"authentication_policy": "ON_USE",
"status": "{status}",
"release": {{
"version": "{release_version}",
{bundle_download_url_field}
"display_name": "Linear",
"description": "Track work in Linear",
"app_ids": [],
{app_manifest_field}
"interface": {{
"short_description": "Plan and track work"
}},
"skills": []
}}
}}"#
);
Mock::given(method("GET"))
.and(path(format!("/backend-api/ps/plugins/{remote_plugin_id}")))
.and(query_param("includeDownloadUrls", "true"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(detail_body))
.mount(server)
.await;
}
async fn mount_empty_remote_installed_plugins(server: &MockServer) {
Mock::given(method("GET"))
.and(path("/backend-api/ps/plugins/installed"))
.and(query_param("scope", "GLOBAL"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(
r#"{
"plugins": [],
"pagination": {
"limit": 50,
"next_page_token": null
}
}"#,
))
.mount(server)
.await;
}
async fn mount_remote_plugin_install(server: &MockServer, remote_plugin_id: &str) {
Mock::given(method("POST"))
.and(path(format!(
"/backend-api/ps/plugins/{remote_plugin_id}/install"
)))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(format!(r#"{{"id":"{remote_plugin_id}","enabled":true}}"#)),
)
.mount(server)
.await;
}
async fn mount_remote_plugin_install_with_apps_needing_auth(
server: &MockServer,
remote_plugin_id: &str,
app_ids_needing_auth: &[&str],
) {
Mock::given(method("POST"))
.and(path(format!(
"/backend-api/ps/plugins/{remote_plugin_id}/install"
)))
.and(query_param("includeAppsNeedingAuth", "true"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"id": remote_plugin_id,
"enabled": true,
"app_ids_needing_auth": app_ids_needing_auth,
})))
.mount(server)
.await;
}
#[derive(Debug, Clone)]
struct CacheManifestExists {
manifest_path: std::path::PathBuf,
}
impl Match for CacheManifestExists {
fn matches(&self, _request: &Request) -> bool {
self.manifest_path.is_file()
}
}
async fn mount_remote_plugin_install_after_cache_write(
server: &MockServer,
remote_plugin_id: &str,
manifest_path: std::path::PathBuf,
) {
Mock::given(method("POST"))
.and(path(format!(
"/backend-api/ps/plugins/{remote_plugin_id}/install"
)))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.and(CacheManifestExists { manifest_path })
.respond_with(
ResponseTemplate::new(200)
.set_body_string(format!(r#"{{"id":"{remote_plugin_id}","enabled":true}}"#)),
)
.mount(server)
.await;
}
async fn send_remote_plugin_install_request(
mcp: &mut TestAppServer,
remote_plugin_id: &str,
) -> Result<i64> {
mcp.send_plugin_install_request(PluginInstallParams {
marketplace_path: None,
remote_marketplace_name: Some("caller-marketplace-is-ignored".to_string()),
plugin_name: remote_plugin_id.to_string(),
})
.await
}
async fn wait_for_remote_plugin_request_count(
server: &MockServer,
method_name: &str,
path_suffix: &str,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
bail!("wiremock did not record requests");
};
let request_count = requests
.iter()
.filter(|request| {
request.method == method_name && request.url.path().ends_with(path_suffix)
})
.count();
if request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if request_count > expected_count {
bail!(
"expected exactly {expected_count} {method_name} {path_suffix} requests, got {request_count}"
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
fn write_plugin_marketplace(
repo_root: &std::path::Path,
marketplace_name: &str,
plugin_name: &str,
source_path: &str,
install_policy: Option<&str>,
auth_policy: Option<&str>,
) -> std::io::Result<()> {
let policy = if install_policy.is_some() || auth_policy.is_some() {
let installation = install_policy
.map(|installation| format!("\n \"installation\": \"{installation}\""))
.unwrap_or_default();
let separator = if install_policy.is_some() && auth_policy.is_some() {
","
} else {
""
};
let authentication = auth_policy
.map(|authentication| {
format!("{separator}\n \"authentication\": \"{authentication}\"")
})
.unwrap_or_default();
format!(",\n \"policy\": {{{installation}{authentication}\n }}")
} else {
String::new()
};
std::fs::create_dir_all(repo_root.join(".git"))?;
std::fs::create_dir_all(repo_root.join(".agents/plugins"))?;
std::fs::write(
repo_root.join(".agents/plugins/marketplace.json"),
format!(
r#"{{
"name": "{marketplace_name}",
"plugins": [
{{
"name": "{plugin_name}",
"source": {{
"source": "local",
"path": "{source_path}"
}}{policy}
}}
]
}}"#
),
)
}
fn write_plugin_source(
repo_root: &std::path::Path,
plugin_name: &str,
app_ids: &[&str],
) -> Result<()> {
let plugin_root = repo_root.join(plugin_name);
std::fs::create_dir_all(plugin_root.join(".codex-plugin"))?;
std::fs::write(
plugin_root.join(".codex-plugin/plugin.json"),
format!(r#"{{"name":"{plugin_name}"}}"#),
)?;
let apps = app_ids
.iter()
.map(|app_id| ((*app_id).to_string(), json!({ "id": app_id })))
.collect::<serde_json::Map<_, _>>();
std::fs::write(
plugin_root.join(".app.json"),
serde_json::to_vec_pretty(&json!({ "apps": apps }))?,
)?;
Ok(())
}
fn write_plugin_mcp_config(
repo_root: &std::path::Path,
plugin_name: &str,
mcp_base_url: &str,
) -> Result<()> {
std::fs::write(
repo_root.join(plugin_name).join(".mcp.json"),
format!(
r#"{{
"mcpServers": {{
"sample-mcp": {{
"type": "http",
"url": "{mcp_base_url}/mcp"
}}
}}
}}"#
),
)?;
Ok(())
}
fn remote_plugin_bundle_tar_gz_bytes(plugin_name: &str) -> Result<Vec<u8>> {
let manifest = format!(r#"{{"name":"{plugin_name}"}}"#);
remote_plugin_bundle_tar_gz_bytes_with_contents(&manifest, /*app_manifest*/ None)
}
fn remote_plugin_bundle_tar_gz_bytes_with_mcp_config(
plugin_name: &str,
mcp_base_url: &str,
) -> Result<Vec<u8>> {
let manifest = format!(r#"{{"name":"{plugin_name}"}}"#);
let mcp_config = format!(
r#"{{
"mcpServers": {{
"sample-mcp": {{
"type": "http",
"url": "{mcp_base_url}/mcp"
}}
}}
}}"#
);
remote_plugin_bundle_tar_gz_bytes_with_entries(
&manifest,
/*app_manifest*/ None,
Some(mcp_config.as_str()),
)
}
fn remote_plugin_bundle_tar_gz_bytes_with_app_and_mcp_config(
plugin_name: &str,
app_manifest: &str,
mcp_base_url: &str,
) -> Result<Vec<u8>> {
let manifest = format!(r#"{{"name":"{plugin_name}"}}"#);
let mcp_config = format!(
r#"{{
"mcpServers": {{
"sample-mcp": {{
"type": "http",
"url": "{mcp_base_url}/mcp"
}}
}}
}}"#
);
remote_plugin_bundle_tar_gz_bytes_with_entries(
&manifest,
Some(app_manifest),
Some(mcp_config.as_str()),
)
}
fn remote_plugin_bundle_tar_gz_bytes_with_contents(
plugin_manifest: &str,
app_manifest: Option<&str>,
) -> Result<Vec<u8>> {
remote_plugin_bundle_tar_gz_bytes_with_entries(
plugin_manifest,
app_manifest,
/*mcp_config*/ None,
)
}
fn remote_plugin_bundle_tar_gz_bytes_with_entries(
plugin_manifest: &str,
app_manifest: Option<&str>,
mcp_config: Option<&str>,
) -> Result<Vec<u8>> {
let skill = "# Plan Work\n\nTrack work in Linear.\n";
let encoder = GzEncoder::new(Vec::new(), Compression::default());
let mut tar = tar::Builder::new(encoder);
let mut entries = vec![
(
".codex-plugin/plugin.json",
plugin_manifest.as_bytes(),
/*mode*/ 0o644,
),
(
"skills/plan-work/SKILL.md",
skill.as_bytes(),
/*mode*/ 0o644,
),
];
if let Some(app_manifest) = app_manifest {
entries.push((".app.json", app_manifest.as_bytes(), /*mode*/ 0o644));
}
if let Some(mcp_config) = mcp_config {
entries.push((".mcp.json", mcp_config.as_bytes(), /*mode*/ 0o644));
}
for (path, contents, mode) in entries {
let mut header = tar::Header::new_gnu();
header.set_size(contents.len() as u64);
header.set_mode(mode);
header.set_cksum();
tar.append_data(&mut header, path, contents)?;
}
Ok(tar.into_inner()?.finish()?)
}