diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 51a6cd08c..d64db0723 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -3220,6 +3220,7 @@ name = "codex-mcp" version = "0.0.0" dependencies = [ "anyhow", + "arc-swap", "async-channel", "codex-api", "codex-async-utils", @@ -3818,6 +3819,7 @@ dependencies = [ "codex-core-skills", "codex-exec-server", "codex-extension-api", + "codex-mcp", "codex-protocol", "codex-utils-absolute-path", "codex-utils-string", diff --git a/codex-rs/app-server/src/extensions.rs b/codex-rs/app-server/src/extensions.rs index 43c2035ce..3293f3a09 100644 --- a/codex-rs/app-server/src/extensions.rs +++ b/codex-rs/app-server/src/extensions.rs @@ -76,7 +76,10 @@ where codex_skills_extension::install_with_providers( &mut builder, codex_skills_extension::SkillProviders::new() - .with_executor_provider(executor_skill_provider), + .with_executor_provider(executor_skill_provider) + .with_orchestrator_provider(Arc::new( + codex_skills_extension::OrchestratorSkillProvider::new(), + )), ); Arc::new(builder.build()) } diff --git a/codex-rs/app-server/tests/suite/v2/mcp_resource.rs b/codex-rs/app-server/tests/suite/v2/mcp_resource.rs index 2e3ffcb06..d1d8d693d 100644 --- a/codex-rs/app-server/tests/suite/v2/mcp_resource.rs +++ b/codex-rs/app-server/tests/suite/v2/mcp_resource.rs @@ -19,6 +19,8 @@ use codex_app_server_protocol::McpResourceReadResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::UserInput; use codex_arg0::Arg0DispatchPaths; use codex_config::CloudConfigBundleLoader; use codex_config::LoaderOverrides; @@ -30,9 +32,14 @@ use codex_protocol::protocol::SessionSource; use core_test_support::responses; use pretty_assertions::assert_eq; use rmcp::handler::server::ServerHandler; +use rmcp::model::ListResourcesResult; +use rmcp::model::Meta; +use rmcp::model::PaginatedRequestParams; use rmcp::model::ProtocolVersion; +use rmcp::model::RawResource; use rmcp::model::ReadResourceRequestParams; use rmcp::model::ReadResourceResult; +use rmcp::model::Resource; use rmcp::model::ResourceContents; use rmcp::model::ServerCapabilities; use rmcp::model::ServerInfo; @@ -41,6 +48,7 @@ use rmcp::service::RoleServer; use rmcp::transport::StreamableHttpServerConfig; use rmcp::transport::StreamableHttpService; use rmcp::transport::streamable_http_server::session::local::LocalSessionManager; +use serde_json::json; use tempfile::TempDir; use tokio::net::TcpListener; use tokio::task::JoinHandle; @@ -51,49 +59,21 @@ const TEST_RESOURCE_URI: &str = "test://codex/resource"; const TEST_BLOB_RESOURCE_URI: &str = "test://codex/resource.bin"; const TEST_RESOURCE_BLOB: &str = "YmluYXJ5LXJlc291cmNl"; const TEST_RESOURCE_TEXT: &str = "Resource body from the MCP server."; +const SKILL_NAME: &str = "demo-plugin:deploy"; +const RAW_SKILL_DESCRIPTION: &str = "Deploy\nthrough the orchestrator."; +const SKILL_DESCRIPTION: &str = "Deploy through the <hosted> orchestrator."; +const SKILL_RESOURCE_URI: &str = "skill://plugin_demo/deploy"; +const SKILL_MAIN_PROMPT_URI: &str = "skill://plugin_demo/deploy/SKILL.md"; +const SKILL_MARKER: &str = "ORCHESTRATOR_SKILL_BODY_MARKER"; +const SKILL_CONTENTS: &str = "---\nname: deploy\ndescription: Deploy through the orchestrator.\n---\n\n# Deploy\n\nORCHESTRATOR_SKILL_BODY_MARKER\n"; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn mcp_resource_read_returns_resource_contents() -> Result<()> { let responses_server = responses::start_mock_server().await; let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?; - - let codex_home = TempDir::new()?; let responses_server_uri = responses_server.uri(); - std::fs::write( - codex_home.path().join("config.toml"), - format!( - r#" -model = "mock-model" -approval_policy = "untrusted" -sandbox_mode = "read-only" - -model_provider = "mock_provider" -chatgpt_base_url = "{apps_server_url}" -mcp_oauth_credentials_store = "file" - -[features] -apps = true - -[model_providers.mock_provider] -name = "Mock provider for test" -base_url = "{responses_server_uri}/v1" -wire_api = "responses" -request_max_retries = 0 -stream_max_retries = 0 -"# - ), - )?; - write_chatgpt_auth( - codex_home.path(), - ChatGptAuthFixture::new("chatgpt-token") - .account_id("account-123") - .chatgpt_user_id("user-123") - .chatgpt_account_id("account-123"), - AuthCredentialsStoreMode::File, - )?; - - let mut mcp = TestAppServer::new(codex_home.path()).await?; - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + let (_codex_home, mut mcp) = + start_resource_test_app_server(&apps_server_url, &responses_server_uri).await?; let thread_start_id = mcp .send_thread_start_request(ThreadStartParams { @@ -120,7 +100,6 @@ stream_max_retries = 0 mcp.read_stream_until_response_message(RequestId::Integer(read_request_id)), ) .await??; - assert_eq!( to_response::(read_response)?, expected_resource_read_response() @@ -131,6 +110,87 @@ stream_max_retries = 0 Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn codex_apps_resources_support_orchestrator_skills_without_an_environment() -> Result<()> { + let responses_server = responses::start_mock_server().await; + let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?; + let responses_server_uri = responses_server.uri(); + let (_codex_home, mut mcp) = + start_resource_test_app_server(&apps_server_url, &responses_server_uri).await?; + + let thread_start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + environments: Some(Vec::new()), + ..Default::default() + }) + .await?; + let thread_start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response(thread_start_resp)?; + + let response_mock = responses::mount_sse_once( + &responses_server, + responses::sse(vec![ + responses::ev_response_created("resp-orchestrator-skill"), + responses::ev_assistant_message("msg-orchestrator-skill", "Done"), + responses::ev_completed("resp-orchestrator-skill"), + ]), + ) + .await; + let turn_start_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id, + input: vec![UserInput::Text { + text: format!("Use ${SKILL_NAME}"), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let request = response_mock.single_request(); + let developer_messages = request.message_input_texts("developer"); + let catalog_line = format!("- {SKILL_NAME}: {SKILL_DESCRIPTION} (file: {SKILL_RESOURCE_URI})"); + assert_eq!( + 1, + developer_messages + .iter() + .filter(|text| text.contains(&catalog_line)) + .count() + ); + assert!( + developer_messages + .iter() + .all(|text| !text.contains("ignored-plugin:ignored")) + ); + let skill_fragments = request + .message_input_texts("user") + .into_iter() + .filter(|text| text.starts_with("")) + .collect::>(); + assert_eq!(1, skill_fragments.len()); + assert!(skill_fragments[0].contains(&format!("{SKILL_NAME}"))); + assert!(skill_fragments[0].contains(SKILL_MARKER)); + + apps_server_handle.abort(); + let _ = apps_server_handle.await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn mcp_resource_read_returns_resource_contents_without_thread() -> Result<()> { let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?; @@ -246,6 +306,52 @@ async fn mcp_resource_read_returns_error_for_unknown_thread() -> Result<()> { Ok(()) } +async fn start_resource_test_app_server( + apps_server_url: &str, + responses_server_uri: &str, +) -> Result<(TempDir, TestAppServer)> { + let codex_home = TempDir::new()?; + std::fs::write( + codex_home.path().join("config.toml"), + format!( + r#" +model = "mock-model" +approval_policy = "untrusted" +sandbox_mode = "read-only" + +model_provider = "mock_provider" +chatgpt_base_url = "{apps_server_url}" +mcp_oauth_credentials_store = "file" + +[features] +apps = true + +[skills] +include_instructions = true + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{responses_server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + )?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + Ok((codex_home, mcp)) +} + async fn start_resource_apps_mcp_server() -> Result<(String, JoinHandle<()>)> { let listener = TcpListener::bind("127.0.0.1:0").await?; let addr = listener.local_addr()?; @@ -292,12 +398,69 @@ impl ServerHandler for ResourceAppsMcpServer { .with_protocol_version(ProtocolVersion::V_2025_06_18) } + async fn list_resources( + &self, + request: Option, + _context: RequestContext, + ) -> Result { + let cursor = request.and_then(|request| request.cursor); + if cursor.is_none() { + return Ok(ListResourcesResult { + resources: vec![skill_resource( + "skill://plugin_ignored/ignored", + "plugin_ignored/ignored", + "Not an MCP skill resource.", + "text/plain", + "ignored-plugin", + "ignored", + )], + next_cursor: Some("skills-page".to_string()), + meta: None, + }); + } + if cursor.as_deref() == Some("failing-page") { + return Err(rmcp::ErrorData::internal_error( + "simulated later-page failure", + /*data*/ None, + )); + } + if cursor.as_deref() != Some("skills-page") { + return Err(rmcp::ErrorData::invalid_params( + "unexpected resources/list cursor", + /*data*/ None, + )); + } + + Ok(ListResourcesResult { + resources: vec![skill_resource( + SKILL_RESOURCE_URI, + "plugin_demo/deploy", + RAW_SKILL_DESCRIPTION, + "mcp/skill", + "demo-plugin", + "deploy", + )], + next_cursor: Some("failing-page".to_string()), + meta: None, + }) + } + async fn read_resource( &self, request: ReadResourceRequestParams, _context: RequestContext, ) -> Result { let uri = request.uri; + if uri == SKILL_MAIN_PROMPT_URI { + return Ok(ReadResourceResult::new(vec![ + ResourceContents::TextResourceContents { + uri: SKILL_MAIN_PROMPT_URI.to_string(), + mime_type: Some("text/markdown".to_string()), + text: SKILL_CONTENTS.to_string(), + meta: None, + }, + ])); + } if uri != TEST_RESOURCE_URI { return Err(rmcp::ErrorData::resource_not_found( format!("resource not found: {uri}"), @@ -321,3 +484,27 @@ impl ServerHandler for ResourceAppsMcpServer { ])) } } + +fn skill_resource( + uri: &str, + name: &str, + description: &str, + mime_type: &str, + plugin_name: &str, + skill_name: &str, +) -> Resource { + Resource::new( + RawResource::new(uri, name) + .with_description(description) + .with_mime_type(mime_type) + .with_meta(skill_resource_meta(plugin_name, skill_name)), + /*annotations*/ None, + ) +} + +fn skill_resource_meta(plugin_name: &str, skill_name: &str) -> Meta { + Meta(serde_json::Map::from_iter([ + ("plugin_name".to_string(), json!(plugin_name)), + ("skill_name".to_string(), json!(skill_name)), + ])) +} diff --git a/codex-rs/codex-mcp/Cargo.toml b/codex-rs/codex-mcp/Cargo.toml index d7634f0aa..433cbde8b 100644 --- a/codex-rs/codex-mcp/Cargo.toml +++ b/codex-rs/codex-mcp/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] anyhow = { workspace = true } +arc-swap = { workspace = true } async-channel = { workspace = true } codex-async-utils = { workspace = true } codex-api = { workspace = true } diff --git a/codex-rs/codex-mcp/src/connection_manager.rs b/codex-rs/codex-mcp/src/connection_manager.rs index 659d35718..9d1be43d4 100644 --- a/codex-rs/codex-mcp/src/connection_manager.rs +++ b/codex-rs/codex-mcp/src/connection_manager.rs @@ -348,6 +348,10 @@ impl McpConnectionManager { !self.clients.is_empty() } + pub(crate) fn contains_server(&self, server_name: &str) -> bool { + self.clients.contains_key(server_name) + } + /// Stop all MCP clients owned by this manager and terminate stdio server processes. pub async fn shutdown(&self) { self.startup_cancellation_token.cancel(); diff --git a/codex-rs/codex-mcp/src/lib.rs b/codex-rs/codex-mcp/src/lib.rs index f0c7c3c8c..195e8614e 100644 --- a/codex-rs/codex-mcp/src/lib.rs +++ b/codex-rs/codex-mcp/src/lib.rs @@ -3,6 +3,9 @@ pub use connection_manager::tool_is_model_visible; pub use elicitation::ElicitationReviewRequest; pub use elicitation::ElicitationReviewer; pub use elicitation::ElicitationReviewerHandle; +pub use resource_client::McpResourceClient; +pub use resource_client::McpResourcePage; +pub use resource_client::McpResourceReadResult; pub use rmcp_client::MCP_SANDBOX_STATE_META_CAPABILITY; pub use runtime::McpRuntimeContext; pub use runtime::SandboxState; @@ -58,6 +61,7 @@ pub(crate) mod codex_apps; pub(crate) mod connection_manager; pub(crate) mod elicitation; pub(crate) mod mcp; +mod resource_client; pub(crate) mod rmcp_client; pub(crate) mod runtime; pub(crate) mod server; diff --git a/codex-rs/codex-mcp/src/resource_client.rs b/codex-rs/codex-mcp/src/resource_client.rs new file mode 100644 index 000000000..85dc974f2 --- /dev/null +++ b/codex-rs/codex-mcp/src/resource_client.rs @@ -0,0 +1,108 @@ +use std::sync::Arc; + +use anyhow::Context; +use anyhow::Result; +use arc_swap::ArcSwap; +use codex_protocol::mcp::Resource; +use codex_protocol::mcp::ResourceContent; +use rmcp::model::PaginatedRequestParams; +use rmcp::model::ReadResourceRequestParams; + +use crate::McpConnectionManager; + +/// One page of resources returned by an MCP server. +#[derive(Clone, Debug, PartialEq)] +pub struct McpResourcePage { + /// Resources advertised on this page. + pub resources: Vec, + /// Opaque cursor to supply when requesting the next page. + pub next_cursor: Option, +} + +/// Contents returned after reading one MCP resource. +#[derive(Clone, Debug, PartialEq)] +pub struct McpResourceReadResult { + /// Text or blob content returned for the requested resource. + pub contents: Vec, +} + +/// Session-scoped access to MCP resources through the currently installed manager. +/// +/// The client retains the manager's shared publication handle rather than a manager +/// snapshot, so calls automatically use replacements installed during startup and refresh. +#[derive(Clone)] +pub struct McpResourceClient { + manager: Arc>, +} + +impl std::fmt::Debug for McpResourceClient { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter + .debug_struct("McpResourceClient") + .finish_non_exhaustive() + } +} + +impl McpResourceClient { + /// Creates a resource client backed by the session's replaceable MCP manager. + pub fn new(manager: Arc>) -> Self { + Self { manager } + } + + /// Returns whether the current manager contains the named server. + /// + /// This does not wait for server startup or imply that startup succeeded. + pub async fn has_server(&self, server: &str) -> bool { + self.manager.load_full().contains_server(server) + } + + /// Lists one resource page from the named server. + pub async fn list_resources( + &self, + server: &str, + cursor: Option, + ) -> Result { + let params = + cursor.map(|cursor| PaginatedRequestParams::default().with_cursor(Some(cursor))); + let result = self + .manager + .load_full() + .list_resources(server, params) + .await?; + let resources = result + .resources + .into_iter() + .map(resource_from_rmcp) + .collect::>>()?; + Ok(McpResourcePage { + resources, + next_cursor: result.next_cursor, + }) + } + + /// Reads one resource from the named server. + pub async fn read_resource(&self, server: &str, uri: &str) -> Result { + let result = self + .manager + .load_full() + .read_resource(server, ReadResourceRequestParams::new(uri.to_string())) + .await?; + let contents = result + .contents + .into_iter() + .map(resource_content_from_rmcp) + .collect::>>()?; + Ok(McpResourceReadResult { contents }) + } +} + +fn resource_from_rmcp(resource: rmcp::model::Resource) -> Result { + let value = serde_json::to_value(resource).context("failed to serialize MCP resource")?; + Resource::from_mcp_value(value).context("failed to convert MCP resource") +} + +fn resource_content_from_rmcp(content: rmcp::model::ResourceContents) -> Result { + let value = + serde_json::to_value(content).context("failed to serialize MCP resource content")?; + serde_json::from_value(value).context("failed to convert MCP resource content") +} diff --git a/codex-rs/core/src/context/available_skills_instructions.rs b/codex-rs/core/src/context/available_skills_instructions.rs index f7921072b..4e166d436 100644 --- a/codex-rs/core/src/context/available_skills_instructions.rs +++ b/codex-rs/core/src/context/available_skills_instructions.rs @@ -5,12 +5,23 @@ use codex_protocol::protocol::SKILLS_INSTRUCTIONS_OPEN_TAG; use super::ContextualUserFragment; +/// Model-context fragment describing the skills available to Codex. #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct AvailableSkillsInstructions { +pub struct AvailableSkillsInstructions { skill_root_lines: Vec, skill_lines: Vec, } +impl AvailableSkillsInstructions { + /// Creates a skills context fragment from pre-rendered catalog lines. + pub fn from_skill_lines(skill_lines: Vec) -> Self { + Self { + skill_root_lines: Vec::new(), + skill_lines, + } + } +} + impl From for AvailableSkillsInstructions { fn from(available_skills: AvailableSkills) -> Self { Self { diff --git a/codex-rs/core/src/context/mod.rs b/codex-rs/core/src/context/mod.rs index d59d6e870..a470c3afb 100644 --- a/codex-rs/core/src/context/mod.rs +++ b/codex-rs/core/src/context/mod.rs @@ -31,7 +31,7 @@ mod user_shell_command; pub(crate) use approved_command_prefix_saved::ApprovedCommandPrefixSaved; pub(crate) use apps_instructions::AppsInstructions; pub(crate) use available_plugins_instructions::AvailablePluginsInstructions; -pub(crate) use available_skills_instructions::AvailableSkillsInstructions; +pub use available_skills_instructions::AvailableSkillsInstructions; pub(crate) use codex_context_fragments::AdditionalContextDeveloperFragment; pub(crate) use codex_context_fragments::AdditionalContextUserFragment; pub use codex_context_fragments::ContextualUserFragment; diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index bdcd78ad6..e9383aef4 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -16,6 +16,7 @@ use codex_protocol::models::is_local_image_close_tag_text; use codex_protocol::models::is_local_image_open_tag_text; use codex_protocol::protocol::COLLABORATION_MODE_OPEN_TAG; use codex_protocol::protocol::REALTIME_CONVERSATION_OPEN_TAG; +use codex_protocol::protocol::SKILLS_INSTRUCTIONS_OPEN_TAG; use codex_protocol::user_input::UserInput; use tracing::warn; use uuid::Uuid; @@ -29,6 +30,7 @@ const CONTEXTUAL_DEVELOPER_PREFIXES: &[&str] = &[ "", COLLABORATION_MODE_OPEN_TAG, REALTIME_CONVERSATION_OPEN_TAG, + SKILLS_INSTRUCTIONS_OPEN_TAG, "", "", ]; diff --git a/codex-rs/core/src/event_mapping_tests.rs b/codex-rs/core/src/event_mapping_tests.rs index ab8cbdd4a..6f9dad64b 100644 --- a/codex-rs/core/src/event_mapping_tests.rs +++ b/codex-rs/core/src/event_mapping_tests.rs @@ -15,9 +15,19 @@ use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::ResponseItem; use codex_protocol::models::WebSearchAction; +use codex_protocol::protocol::SKILLS_INSTRUCTIONS_OPEN_TAG; use codex_protocol::user_input::UserInput; use pretty_assertions::assert_eq; +#[test] +fn recognizes_skills_instructions_as_contextual_developer_content() { + assert!(is_contextual_dev_message_content(&[ + ContentItem::InputText { + text: format!("{SKILLS_INSTRUCTIONS_OPEN_TAG}\n## Skills"), + }, + ])); +} + #[test] fn recognizes_token_budget_as_contextual_developer_content() { let content = vec![ContentItem::InputText { diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index c97053480..4da2b58ac 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -65,6 +65,7 @@ use codex_login::CodexAuth; use codex_login::auth_env_telemetry::collect_auth_env_telemetry; use codex_login::default_client::originator; use codex_mcp::McpConnectionManager; +use codex_mcp::McpResourceClient; use codex_mcp::McpRuntimeContext; use codex_mcp::codex_apps_tools_cache_key; use codex_models_manager::manager::RefreshStrategy; diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index e42cf202e..3da05ed0f 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -946,8 +946,20 @@ impl Session { .effective_agent_max_threads(MultiAgentVersion::V2) .unwrap_or(usize::MAX), ); + // Keep one stable manager handle for the session so extension resource clients + // automatically observe the manager installed at startup and on later refreshes. + let mcp_connection_manager = Arc::new(arc_swap::ArcSwap::from_pointee( + McpConnectionManager::new_uninitialized_with_permission_profile( + &config.permissions.approval_policy, + config.permissions.permission_profile(), + config.prefix_mcp_tool_names(), + ), + )); let session_extension_data = codex_extension_api::ExtensionData::new(session_id.to_string()); + session_extension_data.insert(McpResourceClient::new(Arc::clone( + &mcp_connection_manager, + ))); let thread_extension_data = codex_extension_api::ExtensionData::new_with_init( thread_id.to_string(), thread_extension_init, @@ -970,13 +982,7 @@ impl Session { // before any MCP-related events. It is reasonable to consider // changing this to use Option or OnceCell, though the current // setup is straightforward enough and performs well. - mcp_connection_manager: arc_swap::ArcSwap::from_pointee( - McpConnectionManager::new_uninitialized_with_permission_profile( - &config.permissions.approval_policy, - config.permissions.permission_profile(), - config.prefix_mcp_tool_names(), - ), - ), + mcp_connection_manager, mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 843f8db88..4079deb6b 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -4940,13 +4940,13 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { ); let services = SessionServices { - mcp_connection_manager: arc_swap::ArcSwap::from_pointee( + mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from_pointee( McpConnectionManager::new_uninitialized_with_permission_profile( &config.permissions.approval_policy, config.permissions.permission_profile(), config.prefix_mcp_tool_names(), ), - ), + )), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -7018,13 +7018,13 @@ where ); let services = SessionServices { - mcp_connection_manager: arc_swap::ArcSwap::from_pointee( + mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from_pointee( McpConnectionManager::new_uninitialized_with_permission_profile( &config.permissions.approval_policy, config.permissions.permission_profile(), config.prefix_mcp_tool_names(), ), - ), + )), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index b4c499118..d8d91e85a 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -40,7 +40,7 @@ use tokio_util::sync::CancellationToken; pub(crate) struct SessionServices { /// The latest manager; callers retain an owned handle while performing MCP I/O. - pub(crate) mcp_connection_manager: ArcSwap, + pub(crate) mcp_connection_manager: Arc>, pub(crate) mcp_startup_cancellation_token: Mutex, pub(crate) unified_exec_manager: UnifiedExecProcessManager, #[cfg_attr(not(unix), allow(dead_code))] diff --git a/codex-rs/ext/skills/Cargo.toml b/codex-rs/ext/skills/Cargo.toml index c67d4b0e1..6b4affc0c 100644 --- a/codex-rs/ext/skills/Cargo.toml +++ b/codex-rs/ext/skills/Cargo.toml @@ -18,9 +18,11 @@ codex-core = { workspace = true } codex-core-skills = { workspace = true } codex-exec-server = { workspace = true } codex-extension-api = { workspace = true } +codex-mcp = { workspace = true } codex-protocol = { workspace = true } codex-utils-absolute-path = { workspace = true } codex-utils-string = { workspace = true } +tokio = { workspace = true, features = ["sync", "time"] } [dev-dependencies] async-trait = { workspace = true } diff --git a/codex-rs/ext/skills/src/catalog.rs b/codex-rs/ext/skills/src/catalog.rs index c966cc09e..739c5320b 100644 --- a/codex-rs/ext/skills/src/catalog.rs +++ b/codex-rs/ext/skills/src/catalog.rs @@ -9,8 +9,8 @@ pub enum SkillSourceKind { Host, /// Skills owned by an execution environment. Executor, - /// Skills read through an authenticated remote catalog/API. - Remote, + /// Skills owned by the orchestrator rather than an execution environment. + Orchestrator, /// Extension-private source kind for future providers that do not fit an /// existing transport category. Custom(String), @@ -25,7 +25,7 @@ impl SkillSourceKind { match self { Self::Host => "host", Self::Executor => "executor", - Self::Remote => "remote", + Self::Orchestrator => "orchestrator", Self::Custom(kind) => kind, } } diff --git a/codex-rs/ext/skills/src/extension.rs b/codex-rs/ext/skills/src/extension.rs index e876c66b2..ca09aa3c6 100644 --- a/codex-rs/ext/skills/src/extension.rs +++ b/codex-rs/ext/skills/src/extension.rs @@ -17,11 +17,13 @@ use codex_extension_api::ThreadLifecycleContributor; use codex_extension_api::ThreadStartInput; use codex_extension_api::TurnInputContext; use codex_extension_api::TurnInputContributor; +use codex_mcp::McpResourceClient; use codex_protocol::capabilities::SelectedCapabilityRoot; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::WarningEvent; +use crate::catalog::SkillCatalog; use crate::catalog::SkillCatalogEntry; use crate::catalog::SkillReadResult; use crate::catalog::SkillSourceKind; @@ -84,7 +86,7 @@ impl ConfigContributor for SkillsExtension { impl ContextContributor for SkillsExtension { fn contribute<'a>( &'a self, - _session_store: &'a ExtensionData, + session_store: &'a ExtensionData, thread_store: &'a ExtensionData, ) -> std::pin::Pin> + Send + 'a>> { Box::pin(async move { @@ -92,19 +94,22 @@ impl ContextContributor for SkillsExtension { return Vec::new(); }; let config = thread_state.config(); - if !config.include_instructions || thread_state.selected_roots().is_empty() { + if !config.include_instructions { return Vec::new(); } let catalog = self - .providers - .list_for_turn(SkillListQuery { - turn_id: thread_store.level_id().to_string(), - executor_roots: thread_state.selected_roots().to_vec(), - host: None, - include_host_skills: false, - include_bundled_skills: config.bundled_skills_enabled, - include_remote_skills: false, - }) + .list_skills( + SkillListQuery { + turn_id: thread_store.level_id().to_string(), + executor_roots: thread_state.selected_roots().to_vec(), + host: None, + include_host_skills: false, + include_bundled_skills: config.bundled_skills_enabled, + include_orchestrator_skills: true, + mcp_resources: session_store.get::(), + }, + &thread_state, + ) .await; for warning in &catalog.warnings { self.emit_warning(thread_store.level_id(), warning.clone()); @@ -121,7 +126,7 @@ impl TurnInputContributor for SkillsExtension { fn contribute<'a>( &'a self, input: TurnInputContext, - _session_store: &'a ExtensionData, + session_store: &'a ExtensionData, thread_store: &'a ExtensionData, turn_store: &'a ExtensionData, ) -> ExtensionFuture<'a, Vec>> { @@ -138,9 +143,10 @@ impl TurnInputContributor for SkillsExtension { host: host_loaded_skills.clone(), include_host_skills: true, include_bundled_skills: config.bundled_skills_enabled, - include_remote_skills: true, + include_orchestrator_skills: true, + mcp_resources: session_store.get::(), }; - let catalog = self.providers.list_for_turn(query).await; + let catalog = self.list_skills(query, &thread_state).await; for warning in &catalog.warnings { self.emit_warning(&input.turn_id, warning.clone()); } @@ -149,9 +155,10 @@ impl TurnInputContributor for SkillsExtension { let mut fragments: Vec> = Vec::new(); if config.include_instructions { let mut turn_catalog = catalog.clone(); - turn_catalog - .entries - .retain(|entry| entry.authority.kind != SkillSourceKind::Executor); + turn_catalog.entries.retain(|entry| { + entry.authority.kind != SkillSourceKind::Executor + && entry.authority.kind != SkillSourceKind::Orchestrator + }); if let Some(fragment) = available_skills_fragment(&turn_catalog) { fragments.push(Box::new(fragment)); } @@ -162,7 +169,7 @@ impl TurnInputContributor for SkillsExtension { let mut injected_host_skill_prompts = InjectedHostSkillPrompts::default(); for entry in &selected_entries { match self - .read_main_prompt(entry, host_loaded_skills.clone()) + .read_main_prompt(entry, host_loaded_skills.clone(), session_store) .await { Ok(read_result) => { @@ -232,10 +239,36 @@ impl TurnInputContributor for SkillsExtension { } impl SkillsExtension { + async fn list_skills( + &self, + mut query: SkillListQuery, + thread_state: &SkillsThreadState, + ) -> SkillCatalog { + let include_orchestrator_skills = query.include_orchestrator_skills; + let orchestrator_query = query.clone(); + query.include_orchestrator_skills = false; + + let mut catalog = self.providers.list_for_turn(query).await; + if include_orchestrator_skills { + match thread_state + .orchestrator_catalog_snapshot( + self.providers + .list_orchestrator_for_turn(orchestrator_query), + ) + .await + { + Ok(orchestrator_catalog) => catalog.extend(orchestrator_catalog), + Err(err) => catalog.warnings.push(err.message), + } + } + catalog + } + async fn read_main_prompt( &self, entry: &SkillCatalogEntry, host_loaded_skills: Option>, + session_store: &ExtensionData, ) -> Result { self.providers .read(SkillReadRequest { @@ -243,6 +276,7 @@ impl SkillsExtension { package: entry.id.clone(), resource: entry.main_prompt.clone(), host: host_loaded_skills, + mcp_resources: session_store.get::(), }) .await .map_err(|err| err.message) diff --git a/codex-rs/ext/skills/src/lib.rs b/codex-rs/ext/skills/src/lib.rs index 5ab03ddd9..e12ff74ee 100644 --- a/codex-rs/ext/skills/src/lib.rs +++ b/codex-rs/ext/skills/src/lib.rs @@ -10,6 +10,7 @@ pub use extension::install; pub use extension::install_with_providers; pub use provider::ExecutorSkillProvider; pub use provider::HostSkillProvider; +pub use provider::OrchestratorSkillProvider; pub use provider::SkillProvider; pub use sources::SkillProviderSource; pub use sources::SkillProviders; diff --git a/codex-rs/ext/skills/src/provider.rs b/codex-rs/ext/skills/src/provider.rs index 8d13a2f98..329c0c581 100644 --- a/codex-rs/ext/skills/src/provider.rs +++ b/codex-rs/ext/skills/src/provider.rs @@ -4,8 +4,10 @@ use std::sync::Arc; mod executor; mod host; +mod orchestrator; use codex_core_skills::HostLoadedSkills; +use codex_mcp::McpResourceClient; use codex_protocol::capabilities::SelectedCapabilityRoot; use crate::catalog::SkillAuthority; @@ -18,6 +20,7 @@ use crate::catalog::SkillSearchResult; pub use executor::ExecutorSkillProvider; pub use host::HostSkillProvider; +pub use orchestrator::OrchestratorSkillProvider; #[derive(Clone, Debug)] pub struct SkillListQuery { @@ -26,7 +29,8 @@ pub struct SkillListQuery { pub host: Option>, pub include_host_skills: bool, pub include_bundled_skills: bool, - pub include_remote_skills: bool, + pub include_orchestrator_skills: bool, + pub mcp_resources: Option>, } #[derive(Clone, Debug)] @@ -35,6 +39,7 @@ pub struct SkillReadRequest { pub package: SkillPackageId, pub resource: SkillResourceId, pub host: Option>, + pub mcp_resources: Option>, } #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/codex-rs/ext/skills/src/provider/orchestrator.rs b/codex-rs/ext/skills/src/provider/orchestrator.rs new file mode 100644 index 000000000..db511c7e0 --- /dev/null +++ b/codex-rs/ext/skills/src/provider/orchestrator.rs @@ -0,0 +1,276 @@ +use std::collections::HashSet; +use std::time::Duration; + +use codex_mcp::CODEX_APPS_MCP_SERVER_NAME; +use codex_protocol::mcp::Resource; +use codex_protocol::mcp::ResourceContent; + +use crate::catalog::SkillAuthority; +use crate::catalog::SkillCatalog; +use crate::catalog::SkillCatalogEntry; +use crate::catalog::SkillPackageId; +use crate::catalog::SkillProviderError; +use crate::catalog::SkillReadResult; +use crate::catalog::SkillResourceId; +use crate::catalog::SkillSearchResult; +use crate::catalog::SkillSourceKind; +use crate::provider::SkillListQuery; +use crate::provider::SkillProvider; +use crate::provider::SkillProviderFuture; +use crate::provider::SkillReadRequest; +use crate::provider::SkillSearchRequest; + +const ORCHESTRATOR_SKILL_MIME_TYPE: &str = "mcp/skill"; +const ORCHESTRATOR_SKILL_DISCOVERY_TIMEOUT: Duration = Duration::from_secs(10); +const ORCHESTRATOR_SKILL_READ_TIMEOUT: Duration = Duration::from_secs(10); +const MAX_RESOURCE_PAGES: usize = 10; +const MAX_ORCHESTRATOR_SKILLS: usize = 100; +const MAX_SKILL_NAME_CHARS: usize = 64; +const MAX_QUALIFIED_SKILL_NAME_CHARS: usize = 128; +const MAX_SKILL_DESCRIPTION_CHARS: usize = 1_024; +const MAX_SKILL_URI_CHARS: usize = 1_024; + +/// Discovers and reads skills owned by the orchestrator. +/// +/// The provider uses session-scoped resources without exposing the transport or +/// resource server to callers that configure the skills extension. +#[derive(Clone, Debug, Default)] +pub struct OrchestratorSkillProvider; + +impl OrchestratorSkillProvider { + pub fn new() -> Self { + Self + } +} + +impl SkillProvider for OrchestratorSkillProvider { + fn list(&self, query: SkillListQuery) -> SkillProviderFuture<'_, SkillCatalog> { + Box::pin(async move { + let Some(client) = query.mcp_resources else { + return Ok(SkillCatalog::default()); + }; + if !client.has_server(CODEX_APPS_MCP_SERVER_NAME).await { + return Ok(SkillCatalog::default()); + } + + let discovery_deadline = + tokio::time::Instant::now() + ORCHESTRATOR_SKILL_DISCOVERY_TIMEOUT; + let mut catalog = SkillCatalog::default(); + let mut cursor = None; + let mut seen_cursors = HashSet::new(); + let mut skill_resources_seen = 0usize; + let mut skipped_resources = 0usize; + let mut truncated = false; + let mut completed_pages = 0usize; + + for _ in 0..MAX_RESOURCE_PAGES { + let page = match tokio::time::timeout_at( + discovery_deadline, + client.list_resources(CODEX_APPS_MCP_SERVER_NAME, cursor.clone()), + ) + .await + { + Ok(result) => result.map_err(|err| { + SkillProviderError::new(format!( + "failed to list orchestrator skill resources: {err:#}" + )) + }), + Err(_) => Err(SkillProviderError::new(format!( + "orchestrator skill discovery timed out after {ORCHESTRATOR_SKILL_DISCOVERY_TIMEOUT:?}" + ))), + }; + let result = match page { + Ok(result) => result, + Err(err) if completed_pages == 0 => return Err(err), + Err(err) => { + let page_word = if completed_pages == 1 { + "page" + } else { + "pages" + }; + catalog.warnings.push(format!( + "Orchestrator skill discovery stopped after {completed_pages} resource {page_word}: {}", + err.message + )); + cursor = None; + break; + } + }; + completed_pages = completed_pages.saturating_add(1); + + for resource in &result.resources { + if resource.mime_type.as_deref() != Some(ORCHESTRATOR_SKILL_MIME_TYPE) { + continue; + } + if skill_resources_seen >= MAX_ORCHESTRATOR_SKILLS { + truncated = true; + break; + } + skill_resources_seen = skill_resources_seen.saturating_add(1); + match catalog_entry_from_resource(resource) { + Some(entry) => catalog.push_entry(entry), + None => skipped_resources = skipped_resources.saturating_add(1), + } + } + + if truncated { + break; + } + let Some(next_cursor) = result.next_cursor else { + cursor = None; + break; + }; + if !seen_cursors.insert(next_cursor.clone()) { + catalog.warnings.push( + "Orchestrator skill resource pagination returned a duplicate cursor." + .to_string(), + ); + cursor = None; + break; + } + cursor = Some(next_cursor); + } + + if cursor.is_some() || truncated { + catalog.warnings.push(format!( + "Orchestrator skill discovery was truncated at {MAX_ORCHESTRATOR_SKILLS} skills or {MAX_RESOURCE_PAGES} resource pages." + )); + } + if skipped_resources > 0 { + catalog.warnings.push(format!( + "Skipped {skipped_resources} malformed orchestrator skill resources." + )); + } + + Ok(catalog) + }) + } + + fn read(&self, request: SkillReadRequest) -> SkillProviderFuture<'_, SkillReadResult> { + Box::pin(async move { + if request.authority + != SkillAuthority::new(SkillSourceKind::Orchestrator, CODEX_APPS_MCP_SERVER_NAME) + { + return Err(SkillProviderError::new(format!( + "orchestrator skill provider cannot read authority {}", + request.authority.id + ))); + } + let expected_resource = main_prompt_uri(&request.package.0); + if request.resource.as_str() != expected_resource { + return Err(SkillProviderError::new( + "orchestrator skill resource does not match its package", + )); + } + + let Some(client) = request.mcp_resources.as_ref() else { + return Err(SkillProviderError::new( + "session MCP resource client is not configured", + )); + }; + let result = tokio::time::timeout( + ORCHESTRATOR_SKILL_READ_TIMEOUT, + client.read_resource(CODEX_APPS_MCP_SERVER_NAME, request.resource.as_str()), + ) + .await + .map_err(|_| { + SkillProviderError::new(format!( + "orchestrator skill read timed out after {ORCHESTRATOR_SKILL_READ_TIMEOUT:?}" + )) + })? + .map_err(|err| { + SkillProviderError::new(format!( + "failed to read orchestrator skill resource {}: {err:#}", + request.resource.as_str() + )) + })?; + let contents = result + .contents + .into_iter() + .find_map(|contents| match contents { + ResourceContent::Text { uri, text, .. } if uri == request.resource.as_str() => { + Some(text) + } + ResourceContent::Text { .. } | ResourceContent::Blob { .. } => None, + }); + let Some(contents) = contents else { + return Err(SkillProviderError::new(format!( + "orchestrator skill resource {} did not return matching text contents", + request.resource.as_str() + ))); + }; + + Ok(SkillReadResult { + resource: request.resource, + contents, + }) + }) + } + + fn search(&self, _request: SkillSearchRequest) -> SkillProviderFuture<'_, SkillSearchResult> { + Box::pin(async { Ok(SkillSearchResult::default()) }) + } +} + +fn catalog_entry_from_resource(resource: &Resource) -> Option { + let uri = validated_skill_uri(resource.uri.as_str())?; + let meta = resource.meta.as_ref()?.as_object()?; + let skill_name = normalized_label(meta.get("skill_name")?.as_str()?, MAX_SKILL_NAME_CHARS)?; + let name = if meta.get("source").and_then(|value| value.as_str()) == Some("user") { + skill_name + } else { + let plugin_name = + normalized_label(meta.get("plugin_name")?.as_str()?, MAX_SKILL_NAME_CHARS)?; + let qualified_name = format!("{plugin_name}:{skill_name}"); + (qualified_name.chars().count() <= MAX_QUALIFIED_SKILL_NAME_CHARS) + .then_some(qualified_name)? + }; + let description = normalized_description(resource.description.as_deref().unwrap_or_default())?; + let main_prompt = main_prompt_uri(uri); + + Some( + SkillCatalogEntry::new( + SkillPackageId(uri.to_string()), + SkillAuthority::new(SkillSourceKind::Orchestrator, CODEX_APPS_MCP_SERVER_NAME), + name, + description, + SkillResourceId::new(main_prompt), + ) + .with_display_path(uri), + ) +} + +fn validated_skill_uri(uri: &str) -> Option<&str> { + let path = uri.strip_prefix("skill://")?; + let invalid = path.is_empty() + || uri.chars().count() > MAX_SKILL_URI_CHARS + || uri + .chars() + .any(|ch| ch.is_control() || ch.is_whitespace() || matches!(ch, '<' | '>')); + (!invalid).then_some(uri) +} + +fn normalized_label(value: &str, max_chars: usize) -> Option { + let value = normalized_single_line(value, max_chars)?; + let invalid = value.is_empty() || value.chars().any(|ch| matches!(ch, '&' | '<' | '>')); + (!invalid).then_some(value) +} + +fn normalized_description(value: &str) -> Option { + normalized_single_line(value, MAX_SKILL_DESCRIPTION_CHARS).map(|value| { + value + .replace('&', "&") + .replace('<', "<") + .replace('>', ">") + }) +} + +fn normalized_single_line(value: &str, max_chars: usize) -> Option { + let value = value.split_whitespace().collect::>().join(" "); + let valid = value.chars().count() <= max_chars && !value.chars().any(char::is_control); + valid.then_some(value) +} + +fn main_prompt_uri(package_uri: &str) -> String { + format!("{}/SKILL.md", package_uri.trim_end_matches('/')) +} diff --git a/codex-rs/ext/skills/src/render.rs b/codex-rs/ext/skills/src/render.rs index 6b77d0214..7018297d2 100644 --- a/codex-rs/ext/skills/src/render.rs +++ b/codex-rs/ext/skills/src/render.rs @@ -1,7 +1,4 @@ -use codex_core_skills::render_available_skills_body; -use codex_extension_api::ContextualUserFragment; -use codex_protocol::protocol::SKILLS_INSTRUCTIONS_CLOSE_TAG; -use codex_protocol::protocol::SKILLS_INSTRUCTIONS_OPEN_TAG; +use codex_core::context::AvailableSkillsInstructions; use codex_utils_string::take_bytes_at_char_boundary; use crate::catalog::SkillCatalog; @@ -11,30 +8,9 @@ const MAX_MAIN_PROMPT_BYTES: usize = 8_000; pub(crate) const MAX_SKILL_NAME_BYTES: usize = 256; pub(crate) const MAX_SKILL_PATH_BYTES: usize = 1_024; -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct AvailableSkillsFragment { - body: String, -} - -impl ContextualUserFragment for AvailableSkillsFragment { - fn role(&self) -> &'static str { - "developer" - } - - fn markers(&self) -> (&'static str, &'static str) { - Self::type_markers() - } - - fn body(&self) -> String { - self.body.clone() - } - - fn type_markers() -> (&'static str, &'static str) { - (SKILLS_INSTRUCTIONS_OPEN_TAG, SKILLS_INSTRUCTIONS_CLOSE_TAG) - } -} - -pub(crate) fn available_skills_fragment(catalog: &SkillCatalog) -> Option { +pub(crate) fn available_skills_fragment( + catalog: &SkillCatalog, +) -> Option { let mut total_bytes = 0usize; let mut omitted = 0usize; let mut skill_lines = Vec::new(); @@ -68,9 +44,7 @@ pub(crate) fn available_skills_fragment(catalog: &SkillCatalog) -> Option String { diff --git a/codex-rs/ext/skills/src/sources.rs b/codex-rs/ext/skills/src/sources.rs index 5ed51b878..d2aca25e7 100644 --- a/codex-rs/ext/skills/src/sources.rs +++ b/codex-rs/ext/skills/src/sources.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::catalog::SkillCatalog; use crate::catalog::SkillProviderError; +use crate::catalog::SkillProviderResult; use crate::catalog::SkillReadResult; use crate::catalog::SkillSearchResult; use crate::catalog::SkillSourceKind; @@ -39,15 +40,15 @@ impl SkillProviderSource { Self::new(SkillSourceKind::Executor, label, provider) } - pub fn remote(label: impl Into, provider: Arc) -> Self { - Self::new(SkillSourceKind::Remote, label, provider) + pub fn orchestrator(label: impl Into, provider: Arc) -> Self { + Self::new(SkillSourceKind::Orchestrator, label, provider) } fn should_list(&self, query: &SkillListQuery) -> bool { match &self.kind { SkillSourceKind::Host => query.include_host_skills, SkillSourceKind::Executor => !query.executor_roots.is_empty(), - SkillSourceKind::Remote => query.include_remote_skills, + SkillSourceKind::Orchestrator => query.include_orchestrator_skills, SkillSourceKind::Custom(_) => true, } } @@ -94,20 +95,48 @@ impl SkillProviders { self } - pub fn with_remote_provider(mut self, provider: Arc) -> Self { + pub fn with_orchestrator_provider(mut self, provider: Arc) -> Self { self.sources - .push(SkillProviderSource::remote("remote", provider)); + .push(SkillProviderSource::orchestrator("orchestrator", provider)); self } pub(crate) async fn list_for_turn(&self, query: SkillListQuery) -> SkillCatalog { + self.list_matching(&query, |source| source.should_list(&query)) + .await + } + + pub(crate) async fn list_orchestrator_for_turn( + &self, + query: SkillListQuery, + ) -> SkillProviderResult { let mut catalog = SkillCatalog::default(); for source in self .sources .iter() - .filter(|source| source.should_list(&query)) + .filter(|source| source.kind == SkillSourceKind::Orchestrator) { + let source_catalog = source.provider.list(query.clone()).await.map_err(|err| { + SkillProviderError::new(format!( + "{} skills unavailable: {}", + source.label, err.message + )) + })?; + catalog.extend(source_catalog); + } + + Ok(catalog) + } + + async fn list_matching( + &self, + query: &SkillListQuery, + should_list: impl Fn(&SkillProviderSource) -> bool, + ) -> SkillCatalog { + let mut catalog = SkillCatalog::default(); + + for source in self.sources.iter().filter(|source| should_list(source)) { extend_catalog( &mut catalog, source.provider.list(query.clone()).await, diff --git a/codex-rs/ext/skills/src/state.rs b/codex-rs/ext/skills/src/state.rs index aa77cce82..726487f5c 100644 --- a/codex-rs/ext/skills/src/state.rs +++ b/codex-rs/ext/skills/src/state.rs @@ -1,9 +1,12 @@ use codex_core::config::Config; use codex_protocol::capabilities::SelectedCapabilityRoot; +use std::future::Future; use std::sync::Mutex; +use tokio::sync::OnceCell; use crate::catalog::SkillCatalog; use crate::catalog::SkillCatalogEntry; +use crate::catalog::SkillProviderError; #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct SkillsExtensionConfig { @@ -24,6 +27,7 @@ impl SkillsExtensionConfig { pub(crate) struct SkillsThreadState { config: Mutex, selected_roots: Vec, + orchestrator_catalog: OnceCell, } impl SkillsThreadState { @@ -34,6 +38,7 @@ impl SkillsThreadState { Self { config: Mutex::new(config), selected_roots, + orchestrator_catalog: OnceCell::new(), } } @@ -54,6 +59,16 @@ impl SkillsThreadState { pub(crate) fn selected_roots(&self) -> &[SelectedCapabilityRoot] { &self.selected_roots } + + pub(crate) async fn orchestrator_catalog_snapshot( + &self, + initialize: impl Future> + Send, + ) -> Result { + self.orchestrator_catalog + .get_or_try_init(|| initialize) + .await + .cloned() + } } #[derive(Clone, Debug, Default, PartialEq, Eq)] diff --git a/codex-rs/ext/skills/tests/executor_file_system_authority.rs b/codex-rs/ext/skills/tests/executor_file_system_authority.rs index 026c3f334..94c6f3821 100644 --- a/codex-rs/ext/skills/tests/executor_file_system_authority.rs +++ b/codex-rs/ext/skills/tests/executor_file_system_authority.rs @@ -224,7 +224,8 @@ async fn executor_provider_reads_from_the_environment_instance_used_for_listing( host: None, include_host_skills: false, include_bundled_skills: true, - include_remote_skills: false, + include_orchestrator_skills: false, + mcp_resources: None, }) .await .expect("list executor skills"); @@ -246,6 +247,7 @@ async fn executor_provider_reads_from_the_environment_instance_used_for_listing( package: entry.id, resource: resource.clone(), host: None, + mcp_resources: None, }) .await .expect("read bound executor skill"), @@ -272,7 +274,6 @@ async fn selected_root_id_distinguishes_identical_executor_paths() { Arc::new(EnvironmentManager::default_for_tests()), /*restriction_product*/ None, ); - let catalog = provider .list(SkillListQuery { turn_id: "turn-1".to_string(), @@ -289,7 +290,8 @@ async fn selected_root_id_distinguishes_identical_executor_paths() { host: None, include_host_skills: false, include_bundled_skills: true, - include_remote_skills: false, + include_orchestrator_skills: false, + mcp_resources: None, }) .await .expect("list executor skills"); diff --git a/codex-rs/ext/skills/tests/skills_extension.rs b/codex-rs/ext/skills/tests/skills_extension.rs index dc1aa0140..432358754 100644 --- a/codex-rs/ext/skills/tests/skills_extension.rs +++ b/codex-rs/ext/skills/tests/skills_extension.rs @@ -24,6 +24,7 @@ use codex_skills_extension::catalog::SkillAuthority; use codex_skills_extension::catalog::SkillCatalog; use codex_skills_extension::catalog::SkillCatalogEntry; use codex_skills_extension::catalog::SkillPackageId; +use codex_skills_extension::catalog::SkillProviderError; use codex_skills_extension::catalog::SkillReadResult; use codex_skills_extension::catalog::SkillResourceId; use codex_skills_extension::catalog::SkillSearchResult; @@ -144,6 +145,8 @@ async fn selected_executor_catalog_is_context_and_selected_entrypoint_is_turn_in warnings: Vec::new(), }, read_requests: Arc::clone(&read_requests), + list_calls: None, + fail_first_list: false, }); let providers = SkillProviders::new().with_executor_provider(executor_provider); let mut builder = ExtensionRegistryBuilder::new(); @@ -239,6 +242,70 @@ async fn selected_executor_catalog_is_context_and_selected_entrypoint_is_turn_in Ok(()) } +#[tokio::test] +async fn orchestrator_catalog_snapshot_retries_failure_then_is_reused() -> TestResult { + let list_calls = Arc::new(AtomicUsize::new(0)); + let providers = + SkillProviders::new().with_orchestrator_provider(Arc::new(StaticSkillProvider { + catalog: SkillCatalog { + entries: vec![test_entry( + SkillSourceKind::Orchestrator, + "codex_apps", + "orchestrator/first", + "skill://orchestrator/first/SKILL.md", + )], + warnings: Vec::new(), + }, + read_requests: Arc::new(Mutex::new(Vec::new())), + list_calls: Some(Arc::clone(&list_calls)), + fail_first_list: true, + })); + let mut builder = ExtensionRegistryBuilder::new(); + install_with_providers(&mut builder, providers); + let registry = builder.build(); + let session_store = ExtensionData::new("session"); + let thread_store = ExtensionData::new("thread"); + let session_source = SessionSource::Cli; + let config = default_config().await?; + registry.thread_lifecycle_contributors()[0] + .on_thread_start(ThreadStartInput { + config: &config, + session_source: &session_source, + persistent_thread_state_available: true, + session_store: &session_store, + thread_store: &thread_store, + }) + .await; + + let initial_fragments = registry.context_contributors()[0] + .contribute(&session_store, &thread_store) + .await; + assert!(initial_fragments.is_empty()); + + for turn_id in ["turn-1", "turn-2"] { + let fragments = registry.turn_input_contributors()[0] + .contribute( + TurnInputContext { + turn_id: turn_id.to_string(), + user_input: vec![UserInput::Text { + text: "$first".to_string(), + text_elements: Vec::new(), + }], + environments: Vec::new(), + }, + &session_store, + &thread_store, + &ExtensionData::new(turn_id), + ) + .await; + assert_eq!(1, fragments.len()); + assert!(fragments[0].render().contains("first")); + } + assert_eq!(2, list_calls.load(Ordering::Relaxed)); + + Ok(()) +} + #[tokio::test] async fn root_qualified_locator_selects_only_the_matching_executor_skill() -> TestResult { let read_requests = Arc::new(Mutex::new(Vec::new())); @@ -262,6 +329,8 @@ async fn root_qualified_locator_selects_only_the_matching_executor_skill() -> Te warnings: Vec::new(), }, read_requests: Arc::clone(&read_requests), + list_calls: None, + fail_first_list: false, }); let providers = SkillProviders::new().with_executor_provider(executor_provider); let mut builder = ExtensionRegistryBuilder::new(); @@ -346,6 +415,8 @@ async fn prompt_hidden_skill_can_still_be_invoked() -> TestResult { warnings: Vec::new(), }, read_requests: Arc::clone(&read_requests), + list_calls: None, + fail_first_list: false, }); let providers = SkillProviders::new().with_host_provider(provider); let mut builder = ExtensionRegistryBuilder::new(); @@ -401,12 +472,25 @@ async fn prompt_hidden_skill_can_still_be_invoked() -> TestResult { struct StaticSkillProvider { catalog: SkillCatalog, read_requests: Arc>>, + list_calls: Option>, + fail_first_list: bool, } impl SkillProvider for StaticSkillProvider { fn list(&self, _query: SkillListQuery) -> SkillProviderFuture<'_, SkillCatalog> { + let list_call = self + .list_calls + .as_ref() + .map(|list_calls| list_calls.fetch_add(1, Ordering::Relaxed)); + let fail = self.fail_first_list && list_call == Some(0); let catalog = self.catalog.clone(); - Box::pin(async move { Ok(catalog) }) + Box::pin(async move { + if fail { + Err(SkillProviderError::new("temporary orchestrator failure")) + } else { + Ok(catalog) + } + }) } fn read(&self, request: SkillReadRequest) -> SkillProviderFuture<'_, SkillReadResult> {