Streamline plugin, apps, and skills handlers (#19490)

## Why

The plugin, app, and skills handlers had a lot of repeated
`send_error`/`return` branches that made the success path hard to scan.
This slice keeps behavior the same while moving fallible steps into
local response-producing helpers, so the request boundary can send one
result.

## What Changed

- Converted plugin list/install/uninstall handlers in
`codex-rs/app-server/src/codex_message_processor/plugins.rs` to return
`Result<*Response, JSONRPCErrorError>` from helper methods and call
`send_result` once.
- Added local error-mapping helpers for plugin install/uninstall and
marketplace failures.
- Applied the same mechanical shape to app list, skills list/config, and
marketplace add/remove/upgrade handlers in
`codex-rs/app-server/src/codex_message_processor.rs`.

## Verification

- `cargo check -p codex-app-server`
- `cargo test -p codex-app-server --test all v2::app_list --
--test-threads=1`
- `cargo test -p codex-app-server --test all v2::plugin_ --
--test-threads=1`
- `cargo test -p codex-app-server --test all v2::skills_list --
--test-threads=1`
This commit is contained in:
pakrym-oai
2026-04-27 10:18:25 -07:00
committed by GitHub
Unverified
parent 48dd7b58f0
commit 4ed22fc7d2
2 changed files with 333 additions and 593 deletions
+128 -239
View File
@@ -7,6 +7,7 @@ use crate::error_code::INPUT_TOO_LARGE_ERROR_CODE;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_PARAMS_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::error_code::invalid_params;
use crate::fuzzy_file_search::FuzzyFileSearchSession;
use crate::fuzzy_file_search::run_fuzzy_file_search;
use crate::fuzzy_file_search::start_fuzzy_file_search_session;
@@ -6323,6 +6324,15 @@ impl CodexMessageProcessor {
self.outgoing.send_error(request_id, error).await;
}
async fn send_internal_error(&self, request_id: ConnectionRequestId, message: String) {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message,
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
fn input_too_large_error(actual_chars: usize) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
@@ -6345,41 +6355,6 @@ impl CodexMessageProcessor {
Ok(())
}
async fn send_internal_error(&self, request_id: ConnectionRequestId, message: String) {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message,
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
async fn send_marketplace_error(
&self,
request_id: ConnectionRequestId,
err: MarketplaceError,
action: &str,
) {
match err {
MarketplaceError::MarketplaceNotFound { .. } => {
self.send_invalid_request_error(request_id, err.to_string())
.await;
}
MarketplaceError::Io { .. } => {
self.send_internal_error(request_id, format!("failed to {action}: {err}"))
.await;
}
MarketplaceError::InvalidMarketplaceFile { .. }
| MarketplaceError::PluginNotFound { .. }
| MarketplaceError::PluginNotAvailable { .. }
| MarketplaceError::PluginsDisabled
| MarketplaceError::InvalidPlugin(_) => {
self.send_invalid_request_error(request_id, err.to_string())
.await;
}
}
}
async fn wait_for_thread_shutdown(thread: &Arc<CodexThread>) -> ThreadShutdownResult {
match tokio::time::timeout(Duration::from_secs(10), thread.shutdown_and_wait()).await {
Ok(Ok(())) => ThreadShutdownResult::Complete,
@@ -6458,34 +6433,33 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: ThreadUnsubscribeParams,
) {
let thread_id = match ThreadId::from_string(&params.thread_id) {
Ok(id) => id,
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
.await;
return;
}
};
let result = self
.thread_unsubscribe_response(params, request_id.connection_id)
.await;
self.outgoing.send_result(request_id, result).await;
}
async fn thread_unsubscribe_response(
&self,
params: ThreadUnsubscribeParams,
connection_id: ConnectionId,
) -> Result<ThreadUnsubscribeResponse, JSONRPCErrorError> {
let thread_id = ThreadId::from_string(&params.thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;
if self.thread_manager.get_thread(thread_id).await.is_err() {
// Reconcile stale app-server bookkeeping when the thread has already been
// removed from the core manager. This keeps loaded-status/subscription state
// consistent with the source of truth before reporting NotLoaded.
self.finalize_thread_teardown(thread_id).await;
self.outgoing
.send_response(
request_id,
ThreadUnsubscribeResponse {
status: ThreadUnsubscribeStatus::NotLoaded,
},
)
.await;
return;
return Ok(ThreadUnsubscribeResponse {
status: ThreadUnsubscribeStatus::NotLoaded,
});
};
let was_subscribed = self
.thread_state_manager
.unsubscribe_connection_from_thread(thread_id, request_id.connection_id)
.unsubscribe_connection_from_thread(thread_id, connection_id)
.await;
let status = if was_subscribed {
@@ -6493,9 +6467,7 @@ impl CodexMessageProcessor {
} else {
ThreadUnsubscribeStatus::NotSubscribed
};
self.outgoing
.send_response(request_id, ThreadUnsubscribeResponse { status })
.await;
Ok(ThreadUnsubscribeResponse { status })
}
async fn prepare_thread_for_archive(&self, thread_id: ThreadId) {
@@ -6589,6 +6561,16 @@ impl CodexMessageProcessor {
config: Config,
environment_manager: Arc<EnvironmentManager>,
) {
let result = Self::apps_list_response(&outgoing, params, config, environment_manager).await;
outgoing.send_result(request_id, result).await;
}
async fn apps_list_response(
outgoing: &Arc<OutgoingMessageSender>,
params: AppsListParams,
config: Config,
environment_manager: Arc<EnvironmentManager>,
) -> Result<AppsListResponse, JSONRPCErrorError> {
let AppsListParams {
cursor,
limit,
@@ -6598,15 +6580,7 @@ impl CodexMessageProcessor {
let start = match cursor {
Some(cursor) => match cursor.parse::<usize>() {
Ok(idx) => idx,
Err(_) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid cursor: {cursor}"),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
}
Err(_) => return Err(invalid_request(format!("invalid cursor: {cursor}"))),
},
None => 0,
};
@@ -6660,7 +6634,7 @@ impl CodexMessageProcessor {
accessible_loaded,
all_loaded,
) {
apps_list_helpers::send_app_list_updated_notification(&outgoing, merged.clone())
apps_list_helpers::send_app_list_updated_notification(outgoing, merged.clone())
.await;
last_notified_apps = Some(merged);
}
@@ -6670,25 +6644,13 @@ impl CodexMessageProcessor {
let result = match tokio::time::timeout_at(app_list_deadline, rx.recv()).await {
Ok(Some(result)) => result,
Ok(None) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "failed to load app lists".to_string(),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
return Err(internal_error("failed to load app lists"));
}
Err(_) => {
let timeout_seconds = APP_LIST_LOAD_TIMEOUT.as_secs();
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"timed out waiting for app lists after {timeout_seconds} seconds"
),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
return Err(internal_error(format!(
"timed out waiting for app lists after {timeout_seconds} seconds"
)));
}
};
@@ -6698,26 +6660,14 @@ impl CodexMessageProcessor {
accessible_loaded = true;
}
AppListLoadResult::Accessible(Err(err)) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: err,
data: None,
};
outgoing.send_error(request_id, error).await;
return;
return Err(internal_error(err));
}
AppListLoadResult::Directory(Ok(connectors)) => {
all_connectors = Some(connectors);
all_loaded = true;
}
AppListLoadResult::Directory(Err(err)) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: err,
data: None,
};
outgoing.send_error(request_id, error).await;
return;
return Err(internal_error(err));
}
}
@@ -6747,27 +6697,26 @@ impl CodexMessageProcessor {
all_loaded,
) && last_notified_apps.as_ref() != Some(&merged)
{
apps_list_helpers::send_app_list_updated_notification(&outgoing, merged.clone())
apps_list_helpers::send_app_list_updated_notification(outgoing, merged.clone())
.await;
last_notified_apps = Some(merged.clone());
}
if accessible_loaded && all_loaded {
match apps_list_helpers::paginate_apps(merged.as_slice(), start, limit) {
Ok(response) => {
outgoing.send_response(request_id, response).await;
return;
}
Err(error) => {
outgoing.send_error(request_id, error).await;
return;
}
}
return apps_list_helpers::paginate_apps(merged.as_slice(), start, limit);
}
}
}
async fn skills_list(&self, request_id: ConnectionRequestId, params: SkillsListParams) {
let result = self.skills_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn skills_list_response(
&self,
params: SkillsListParams,
) -> Result<SkillsListResponse, JSONRPCErrorError> {
let SkillsListParams {
cwds,
force_reload,
@@ -6792,17 +6741,13 @@ impl CodexMessageProcessor {
let mut valid_extra_roots = Vec::new();
for root in entry.extra_user_roots {
let Ok(root) = AbsolutePathBuf::from_absolute_path_checked(root.as_path()) else {
self.send_invalid_request_error(
request_id,
format!(
let root =
AbsolutePathBuf::from_absolute_path_checked(root.as_path()).map_err(|_| {
invalid_request(format!(
"skills/list perCwdExtraUserRoots extraUserRoots paths must be absolute: {}",
root.display()
),
)
.await;
return;
};
))
})?;
valid_extra_roots.push(root);
}
extra_roots_by_cwd
@@ -6811,13 +6756,7 @@ impl CodexMessageProcessor {
.extend(valid_extra_roots);
}
let config = match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => config,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
let auth = self.auth_manager.auth().await;
let workspace_codex_plugins_enabled = self
.workspace_codex_plugins_enabled(&config, auth.as_ref())
@@ -6896,9 +6835,7 @@ impl CodexMessageProcessor {
errors,
});
}
self.outgoing
.send_response(request_id, SkillsListResponse { data })
.await;
Ok(SkillsListResponse { data })
}
async fn marketplace_remove(
&self,
@@ -6911,27 +6848,16 @@ impl CodexMessageProcessor {
marketplace_name: params.marketplace_name,
},
)
.await;
match result {
Ok(outcome) => {
self.outgoing
.send_response(
request_id,
MarketplaceRemoveResponse {
marketplace_name: outcome.marketplace_name,
installed_root: outcome.removed_installed_root,
},
)
.await;
}
Err(MarketplaceRemoveError::InvalidRequest(message)) => {
self.send_invalid_request_error(request_id, message).await;
}
Err(MarketplaceRemoveError::Internal(message)) => {
self.send_internal_error(request_id, message).await;
}
}
.await
.map(|outcome| MarketplaceRemoveResponse {
marketplace_name: outcome.marketplace_name,
installed_root: outcome.removed_installed_root,
})
.map_err(|err| match err {
MarketplaceRemoveError::InvalidRequest(message) => invalid_request(message),
MarketplaceRemoveError::Internal(message) => internal_error(message),
});
self.outgoing.send_result(request_id, result).await;
}
async fn marketplace_upgrade(
@@ -6939,53 +6865,38 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: MarketplaceUpgradeParams,
) {
let config = match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => config,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
};
let result = self.marketplace_upgrade_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn marketplace_upgrade_response(
&self,
params: MarketplaceUpgradeParams,
) -> Result<MarketplaceUpgradeResponse, JSONRPCErrorError> {
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
let plugins_manager = self.thread_manager.plugins_manager();
let MarketplaceUpgradeParams { marketplace_name } = params;
let result = tokio::task::spawn_blocking(move || {
let outcome = tokio::task::spawn_blocking(move || {
plugins_manager
.upgrade_configured_marketplaces_for_config(&config, marketplace_name.as_deref())
})
.await;
.await
.map_err(|err| internal_error(format!("failed to upgrade marketplaces: {err}")))?
.map_err(invalid_request)?;
match result {
Ok(Ok(outcome)) => {
self.outgoing
.send_response(
request_id,
MarketplaceUpgradeResponse {
selected_marketplaces: outcome.selected_marketplaces,
upgraded_roots: outcome.upgraded_roots,
errors: outcome
.errors
.into_iter()
.map(|err| MarketplaceUpgradeErrorInfo {
marketplace_name: err.marketplace_name,
message: err.message,
})
.collect(),
},
)
.await;
}
Ok(Err(message)) => {
self.send_invalid_request_error(request_id, message).await;
}
Err(err) => {
self.send_internal_error(
request_id,
format!("failed to upgrade marketplaces: {err}"),
)
.await;
}
}
Ok(MarketplaceUpgradeResponse {
selected_marketplaces: outcome.selected_marketplaces,
upgraded_roots: outcome.upgraded_roots,
errors: outcome
.errors
.into_iter()
.map(|err| MarketplaceUpgradeErrorInfo {
marketplace_name: err.marketplace_name,
message: err.message,
})
.collect(),
})
}
async fn marketplace_add(&self, request_id: ConnectionRequestId, params: MarketplaceAddParams) {
@@ -6997,28 +6908,17 @@ impl CodexMessageProcessor {
sparse_paths: params.sparse_paths.unwrap_or_default(),
},
)
.await;
match result {
Ok(outcome) => {
self.outgoing
.send_response(
request_id,
MarketplaceAddResponse {
marketplace_name: outcome.marketplace_name,
installed_root: outcome.installed_root,
already_added: outcome.already_added,
},
)
.await;
}
Err(MarketplaceAddError::InvalidRequest(message)) => {
self.send_invalid_request_error(request_id, message).await;
}
Err(MarketplaceAddError::Internal(message)) => {
self.send_internal_error(request_id, message).await;
}
}
.await
.map(|outcome| MarketplaceAddResponse {
marketplace_name: outcome.marketplace_name,
installed_root: outcome.installed_root,
already_added: outcome.already_added,
})
.map_err(|err| match err {
MarketplaceAddError::InvalidRequest(message) => invalid_request(message),
MarketplaceAddError::Internal(message) => internal_error(message),
});
self.outgoing.send_result(request_id, result).await;
}
async fn skills_config_write(
@@ -7026,6 +6926,14 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: SkillsConfigWriteParams,
) {
let result = self.skills_config_write_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn skills_config_write_response(
&self,
params: SkillsConfigWriteParams,
) -> Result<SkillsConfigWriteResponse, JSONRPCErrorError> {
let SkillsConfigWriteParams {
path,
name,
@@ -7040,43 +6948,24 @@ impl CodexMessageProcessor {
ConfigEdit::SetSkillConfigByName { name, enabled }
}
_ => {
let error = JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
message: "skills/config/write requires exactly one of path or name".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
return Err(invalid_params(
"skills/config/write requires exactly one of path or name",
));
}
};
let edits = vec![edit];
let result = ConfigEditsBuilder::new(&self.config.codex_home)
ConfigEditsBuilder::new(&self.config.codex_home)
.with_edits(edits)
.apply()
.await;
match result {
Ok(()) => {
.await
.map(|()| {
self.thread_manager.plugins_manager().clear_cache();
self.thread_manager.skills_manager().clear_cache();
self.outgoing
.send_response(
request_id,
SkillsConfigWriteResponse {
effective_enabled: enabled,
},
)
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to update skill settings: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
SkillsConfigWriteResponse {
effective_enabled: enabled,
}
})
.map_err(|err| internal_error(format!("failed to update skill settings: {err}")))
}
async fn turn_start(
@@ -1,4 +1,6 @@
use super::*;
use crate::error_code::internal_error;
use crate::error_code::invalid_request;
use codex_app_server_protocol::PluginInstallPolicy;
impl CodexMessageProcessor {
@@ -7,46 +9,33 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: PluginListParams,
) {
let result = self.plugin_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_list_response(
&self,
params: PluginListParams,
) -> Result<PluginListResponse, JSONRPCErrorError> {
let plugins_manager = self.thread_manager.plugins_manager();
let PluginListParams { cwds } = params;
let roots = cwds.unwrap_or_default();
let config = match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => config,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
let empty_response = || PluginListResponse {
marketplaces: Vec::new(),
marketplace_load_errors: Vec::new(),
featured_plugin_ids: Vec::new(),
};
if !config.features.enabled(Feature::Plugins) {
self.outgoing
.send_response(
request_id,
PluginListResponse {
marketplaces: Vec::new(),
marketplace_load_errors: Vec::new(),
featured_plugin_ids: Vec::new(),
},
)
.await;
return;
return Ok(empty_response());
}
let auth = self.auth_manager.auth().await;
if !self
.workspace_codex_plugins_enabled(&config, auth.as_ref())
.await
{
self.outgoing
.send_response(
request_id,
PluginListResponse {
marketplaces: Vec::new(),
marketplace_load_errors: Vec::new(),
featured_plugin_ids: Vec::new(),
},
)
.await;
return;
return Ok(empty_response());
}
plugins_manager.maybe_start_non_curated_plugin_cache_refresh(&roots);
@@ -100,18 +89,11 @@ impl CodexMessageProcessor {
.await
{
Ok(Ok(outcome)) => outcome,
Ok(Err(err)) => {
self.send_marketplace_error(request_id, err, "list marketplace plugins")
.await;
return;
}
Ok(Err(err)) => return Err(Self::marketplace_error(err, "list marketplace plugins")),
Err(err) => {
self.send_internal_error(
request_id,
format!("failed to list marketplace plugins: {err}"),
)
.await;
return;
return Err(internal_error(format!(
"failed to list marketplace plugins: {err}"
)));
}
};
@@ -174,16 +156,11 @@ impl CodexMessageProcessor {
Vec::new()
};
self.outgoing
.send_response(
request_id,
PluginListResponse {
marketplaces: data,
marketplace_load_errors,
featured_plugin_ids,
},
)
.await;
Ok(PluginListResponse {
marketplaces: data,
marketplace_load_errors,
featured_plugin_ids,
})
}
pub(super) async fn plugin_read(
@@ -191,6 +168,14 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: PluginReadParams,
) {
let result = self.plugin_read_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_read_response(
&self,
params: PluginReadParams,
) -> Result<PluginReadResponse, JSONRPCErrorError> {
let plugins_manager = self.thread_manager.plugins_manager();
let PluginReadParams {
marketplace_path,
@@ -201,30 +186,16 @@ impl CodexMessageProcessor {
(Some(marketplace_path), None) => Ok(marketplace_path),
(None, Some(remote_marketplace_name)) => Err(remote_marketplace_name),
(Some(_), Some(_)) | (None, None) => {
self.outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "plugin/read requires exactly one of marketplacePath or remoteMarketplaceName".to_string(),
data: None,
},
)
.await;
return;
return Err(invalid_request(
"plugin/read requires exactly one of marketplacePath or remoteMarketplaceName",
));
}
};
let config_cwd = read_source.as_ref().ok().and_then(|marketplace_path| {
marketplace_path.as_path().parent().map(Path::to_path_buf)
});
let config = match self.load_latest_config(config_cwd).await {
Ok(config) => config,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
};
let config = self.load_latest_config(config_cwd).await?;
let plugin = match read_source {
Ok(marketplace_path) => {
@@ -232,17 +203,10 @@ impl CodexMessageProcessor {
plugin_name,
marketplace_path,
};
let outcome = match plugins_manager
let outcome = plugins_manager
.read_plugin_for_config(&config, &request)
.await
{
Ok(outcome) => outcome,
Err(err) => {
self.send_marketplace_error(request_id, err, "read plugin details")
.await;
return;
}
};
.map_err(|err| Self::marketplace_error(err, "read plugin details"))?;
let environment_manager = self.thread_manager.environment_manager();
let app_summaries = plugin_app_helpers::load_plugin_app_summaries(
&config,
@@ -287,19 +251,9 @@ impl CodexMessageProcessor {
if !config.features.enabled(Feature::Plugins)
|| !config.features.enabled(Feature::RemotePlugin)
{
self.outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"remote plugin read is not enabled for marketplace {remote_marketplace_name}"
),
data: None,
},
)
.await;
return;
return Err(invalid_request(format!(
"remote plugin read is not enabled for marketplace {remote_marketplace_name}"
)));
}
let auth = self.auth_manager.auth().await;
let remote_plugin_service_config = RemotePluginServiceConfig {
@@ -310,36 +264,20 @@ impl CodexMessageProcessor {
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '~')
{
self.send_invalid_request_error(
request_id,
"invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed"
.to_string(),
)
.await;
return;
return Err(invalid_request(
"invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed",
));
}
let remote_detail = match codex_core_plugins::remote::fetch_remote_plugin_detail(
let remote_detail = codex_core_plugins::remote::fetch_remote_plugin_detail(
&remote_plugin_service_config,
auth.as_ref(),
&remote_marketplace_name,
&plugin_name,
)
.await
{
Ok(remote_detail) => remote_detail,
Err(err) => {
self.outgoing
.send_error(
request_id,
remote_plugin_catalog_error_to_jsonrpc(
err,
"read remote plugin details",
),
)
.await;
return;
}
};
.map_err(|err| {
remote_plugin_catalog_error_to_jsonrpc(err, "read remote plugin details")
})?;
let plugin_apps = remote_detail
.app_ids
.iter()
@@ -357,9 +295,7 @@ impl CodexMessageProcessor {
}
};
self.outgoing
.send_response(request_id, PluginReadResponse { plugin })
.await;
Ok(PluginReadResponse { plugin })
}
pub(super) async fn plugin_install(
@@ -367,6 +303,14 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: PluginInstallParams,
) {
let result = self.plugin_install_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_install_response(
&self,
params: PluginInstallParams,
) -> Result<PluginInstallResponse, JSONRPCErrorError> {
let PluginInstallParams {
marketplace_path,
remote_marketplace_name,
@@ -375,44 +319,27 @@ impl CodexMessageProcessor {
let marketplace_path = match (marketplace_path, remote_marketplace_name) {
(Some(marketplace_path), None) => marketplace_path,
(None, Some(remote_marketplace_name)) => {
self.remote_plugin_install(request_id, remote_marketplace_name, plugin_name)
return self
.remote_plugin_install_response(remote_marketplace_name, plugin_name)
.await;
return;
}
(Some(_), Some(_)) | (None, None) => {
self.outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "plugin/install requires exactly one of marketplacePath or remoteMarketplaceName".to_string(),
data: None,
},
)
.await;
return;
return Err(invalid_request(
"plugin/install requires exactly one of marketplacePath or remoteMarketplaceName",
));
}
};
let config_cwd = marketplace_path.as_path().parent().map(Path::to_path_buf);
let config = match self.load_latest_config(config_cwd.clone()).await {
Ok(config) => config,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
};
let config = self.load_latest_config(config_cwd.clone()).await?;
let auth = self.auth_manager.auth().await;
if !self
.workspace_codex_plugins_enabled(&config, auth.as_ref())
.await
{
self.send_invalid_request_error(
request_id,
"Codex plugins are disabled for this workspace".to_string(),
)
.await;
return;
return Err(invalid_request(
"Codex plugins are disabled for this workspace",
));
}
let plugins_manager = self.thread_manager.plugins_manager();
@@ -421,197 +348,103 @@ impl CodexMessageProcessor {
marketplace_path,
};
let install_result = plugins_manager.install_plugin(request).await;
match install_result {
Ok(result) => {
let config = match self.load_latest_config(config_cwd).await {
Ok(config) => config,
Err(err) => {
warn!(
"failed to reload config after plugin install, using current config: {err:?}"
);
config
}
};
self.clear_plugin_related_caches();
let plugin_mcp_servers =
load_plugin_mcp_servers(result.installed_path.as_path()).await;
if !plugin_mcp_servers.is_empty() {
if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await {
warn!(
plugin = result.plugin_id.as_key(),
"failed to queue MCP refresh after plugin install: {err:?}"
);
}
self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers)
.await;
}
let plugin_apps = load_plugin_apps(result.installed_path.as_path()).await;
let auth = self.auth_manager.auth().await;
let apps_needing_auth = self
.plugin_apps_needing_auth_for_install(
&config,
auth.as_ref().is_some_and(CodexAuth::is_chatgpt_auth),
&result.plugin_id.as_key(),
&plugin_apps,
)
.await;
self.outgoing
.send_response(
request_id,
PluginInstallResponse {
auth_policy: result.auth_policy.into(),
apps_needing_auth,
},
)
.await;
}
Err(err) => {
if err.is_invalid_request() {
self.send_invalid_request_error(request_id, err.to_string())
.await;
return;
}
match err {
CorePluginInstallError::Marketplace(err) => {
self.send_marketplace_error(request_id, err, "install plugin")
.await;
}
CorePluginInstallError::Config(err) => {
self.send_internal_error(
request_id,
format!("failed to persist installed plugin config: {err}"),
)
.await;
}
CorePluginInstallError::Remote(err) => {
self.send_internal_error(
request_id,
format!("failed to enable remote plugin: {err}"),
)
.await;
}
CorePluginInstallError::Join(err) => {
self.send_internal_error(
request_id,
format!("failed to install plugin: {err}"),
)
.await;
}
CorePluginInstallError::Store(err) => {
self.send_internal_error(
request_id,
format!("failed to install plugin: {err}"),
)
.await;
}
}
}
}
}
async fn remote_plugin_install(
&self,
request_id: ConnectionRequestId,
remote_marketplace_name: String,
plugin_name: String,
) {
let config = match self.load_latest_config(/*fallback_cwd*/ None).await {
let result = plugins_manager
.install_plugin(request)
.await
.map_err(Self::plugin_install_error)?;
let config = match self.load_latest_config(config_cwd).await {
Ok(config) => config,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
warn!(
"failed to reload config after plugin install, using current config: {err:?}"
);
config
}
};
self.clear_plugin_related_caches();
let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path()).await;
if !plugin_mcp_servers.is_empty() {
if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await {
warn!(
plugin = result.plugin_id.as_key(),
"failed to queue MCP refresh after plugin install: {err:?}"
);
}
self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers)
.await;
}
let plugin_apps = load_plugin_apps(result.installed_path.as_path()).await;
let auth = self.auth_manager.auth().await;
let apps_needing_auth = self
.plugin_apps_needing_auth_for_install(
&config,
auth.as_ref().is_some_and(CodexAuth::is_chatgpt_auth),
&result.plugin_id.as_key(),
&plugin_apps,
)
.await;
Ok(PluginInstallResponse {
auth_policy: result.auth_policy.into(),
apps_needing_auth,
})
}
async fn remote_plugin_install_response(
&self,
remote_marketplace_name: String,
plugin_name: String,
) -> Result<PluginInstallResponse, JSONRPCErrorError> {
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
if !config.features.enabled(Feature::Plugins)
|| !config.features.enabled(Feature::RemotePlugin)
{
self.outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"remote plugin install is not enabled for marketplace {remote_marketplace_name}"
),
data: None,
},
)
.await;
return;
return Err(invalid_request(format!(
"remote plugin install is not enabled for marketplace {remote_marketplace_name}"
)));
}
if plugin_name.is_empty()
|| !plugin_name
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '~')
{
self.send_invalid_request_error(
request_id,
"invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed"
.to_string(),
)
.await;
return;
return Err(invalid_request(
"invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed",
));
}
let auth = self.auth_manager.auth().await;
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
let remote_detail = match codex_core_plugins::remote::fetch_remote_plugin_detail(
let remote_detail = codex_core_plugins::remote::fetch_remote_plugin_detail(
&remote_plugin_service_config,
auth.as_ref(),
&remote_marketplace_name,
&plugin_name,
)
.await
{
Ok(remote_detail) => remote_detail,
Err(err) => {
self.outgoing
.send_error(
request_id,
remote_plugin_catalog_error_to_jsonrpc(
err,
"read remote plugin details before install",
),
)
.await;
return;
}
};
.map_err(|err| {
remote_plugin_catalog_error_to_jsonrpc(err, "read remote plugin details before install")
})?;
if remote_detail.summary.install_policy == PluginInstallPolicy::NotAvailable {
self.send_invalid_request_error(
request_id,
format!("remote plugin {plugin_name} is not available for install"),
)
.await;
return;
return Err(invalid_request(format!(
"remote plugin {plugin_name} is not available for install"
)));
}
if let Err(err) = codex_core_plugins::remote::install_remote_plugin(
codex_core_plugins::remote::install_remote_plugin(
&remote_plugin_service_config,
auth.as_ref(),
&remote_marketplace_name,
&plugin_name,
)
.await
{
self.outgoing
.send_error(
request_id,
remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"),
)
.await;
return;
}
.map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"))?;
self.clear_plugin_related_caches();
@@ -629,15 +462,10 @@ impl CodexMessageProcessor {
)
.await;
self.outgoing
.send_response(
request_id,
PluginInstallResponse {
auth_policy: remote_detail.summary.auth_policy,
apps_needing_auth,
},
)
.await;
Ok(PluginInstallResponse {
auth_policy: remote_detail.summary.auth_policy,
apps_needing_auth,
})
}
async fn plugin_apps_needing_auth_for_install(
@@ -709,59 +537,82 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: PluginUninstallParams,
) {
let result = self.plugin_uninstall_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_uninstall_response(
&self,
params: PluginUninstallParams,
) -> Result<PluginUninstallResponse, JSONRPCErrorError> {
let PluginUninstallParams { plugin_id } = params;
let plugins_manager = self.thread_manager.plugins_manager();
let uninstall_result = plugins_manager.uninstall_plugin(plugin_id).await;
plugins_manager
.uninstall_plugin(plugin_id)
.await
.map_err(Self::plugin_uninstall_error)?;
self.clear_plugin_related_caches();
Ok(PluginUninstallResponse {})
}
match uninstall_result {
Ok(()) => {
self.clear_plugin_related_caches();
self.outgoing
.send_response(request_id, PluginUninstallResponse {})
.await;
}
Err(err) => {
if err.is_invalid_request() {
self.send_invalid_request_error(request_id, err.to_string())
.await;
return;
}
fn plugin_install_error(err: CorePluginInstallError) -> JSONRPCErrorError {
if err.is_invalid_request() {
return invalid_request(err.to_string());
}
match err {
CorePluginUninstallError::Config(err) => {
self.send_internal_error(
request_id,
format!("failed to clear plugin config: {err}"),
)
.await;
}
CorePluginUninstallError::Remote(err) => {
self.send_internal_error(
request_id,
format!("failed to uninstall remote plugin: {err}"),
)
.await;
}
CorePluginUninstallError::Join(err) => {
self.send_internal_error(
request_id,
format!("failed to uninstall plugin: {err}"),
)
.await;
}
CorePluginUninstallError::Store(err) => {
self.send_internal_error(
request_id,
format!("failed to uninstall plugin: {err}"),
)
.await;
}
CorePluginUninstallError::InvalidPluginId(_) => {
unreachable!("invalid plugin ids are handled above");
}
}
match err {
CorePluginInstallError::Marketplace(err) => {
Self::marketplace_error(err, "install plugin")
}
CorePluginInstallError::Config(err) => {
internal_error(format!("failed to persist installed plugin config: {err}"))
}
CorePluginInstallError::Remote(err) => {
internal_error(format!("failed to enable remote plugin: {err}"))
}
CorePluginInstallError::Join(err) => {
internal_error(format!("failed to install plugin: {err}"))
}
CorePluginInstallError::Store(err) => {
internal_error(format!("failed to install plugin: {err}"))
}
}
}
fn plugin_uninstall_error(err: CorePluginUninstallError) -> JSONRPCErrorError {
if err.is_invalid_request() {
return invalid_request(err.to_string());
}
match err {
CorePluginUninstallError::Config(err) => {
internal_error(format!("failed to clear plugin config: {err}"))
}
CorePluginUninstallError::Remote(err) => {
internal_error(format!("failed to uninstall remote plugin: {err}"))
}
CorePluginUninstallError::Join(err) => {
internal_error(format!("failed to uninstall plugin: {err}"))
}
CorePluginUninstallError::Store(err) => {
internal_error(format!("failed to uninstall plugin: {err}"))
}
CorePluginUninstallError::InvalidPluginId(_) => {
unreachable!("invalid plugin ids are handled above");
}
}
}
fn marketplace_error(err: MarketplaceError, action: &str) -> JSONRPCErrorError {
match err {
MarketplaceError::MarketplaceNotFound { .. }
| MarketplaceError::InvalidMarketplaceFile { .. }
| MarketplaceError::PluginNotFound { .. }
| MarketplaceError::PluginNotAvailable { .. }
| MarketplaceError::PluginsDisabled
| MarketplaceError::InvalidPlugin(_) => invalid_request(err.to_string()),
MarketplaceError::Io { .. } => internal_error(format!("failed to {action}: {err}")),
}
}
}