skills: make backend plugin skills invocable without an executor (#27387)

## Why

#27198 made the extension-owned `codex_apps` MCP connection the hosted
plugin runtime, but its `mcp/skill` resources still bypassed the skills
extension. App-server could list and read those resources through
generic MCP APIs, but a thread with no selected environment did not
expose them in the model's skills catalog or load their `SKILL.md`
through `$skill`.

Hosted skills should stay remote while using the same typed catalog,
source authority, deduplication, bounded contextual catalog, and
selected-skill prompt injection as host and executor skills. They should
not be downloaded or exposed as ambient filesystem paths.

## What changed

- Add a session-scoped `McpResourceClient` over the replaceable MCP
connection manager so resource list/read calls follow startup and
refresh replacements.
- Add a `BackendSkillProvider` that pages `codex_apps` resources,
accepts bounded and validated `mcp/skill` entries, and reads a selected
skill's `SKILL.md` through the same MCP connection.
- Register the remote provider in app-server and include it in the
skills catalog even when a thread has no selected capability roots or
executor.
- Contribute hosted skill metadata through the bounded
`AvailableSkillsInstructions` developer-context path, exclude remote
entries from per-turn catalog injection, and classify `<skills>`
messages as contextual developer content so rollback can trim and
rebuild them correctly.

## Testing

- Extend the app-server MCP resource integration test with
`environments: []` to exercise two-page discovery, filter a
non-`mcp/skill` resource, verify the escaped developer catalog entry and
user-role `<skill>` fragment containing the fetched `SKILL.md`, and
preserve generic MCP resource reads.
- Add core event-mapping coverage that classifies `<skills>` developer
messages as contextual history.
This commit is contained in:
jif
2026-06-11 10:28:16 +01:00
committed by GitHub
Unverified
parent 53b5019745
commit a287c5dffd
26 changed files with 877 additions and 116 deletions
+2
View File
@@ -3220,6 +3220,7 @@ name = "codex-mcp"
version = "0.0.0"
dependencies = [
"anyhow",
"arc-swap",
"async-channel",
"codex-api",
"codex-async-utils",
@@ -3818,6 +3819,7 @@ dependencies = [
"codex-core-skills",
"codex-exec-server",
"codex-extension-api",
"codex-mcp",
"codex-protocol",
"codex-utils-absolute-path",
"codex-utils-string",
+4 -1
View File
@@ -76,7 +76,10 @@ where
codex_skills_extension::install_with_providers(
&mut builder,
codex_skills_extension::SkillProviders::new()
.with_executor_provider(executor_skill_provider),
.with_executor_provider(executor_skill_provider)
.with_orchestrator_provider(Arc::new(
codex_skills_extension::OrchestratorSkillProvider::new(),
)),
);
Arc::new(builder.build())
}
@@ -19,6 +19,8 @@ use codex_app_server_protocol::McpResourceReadResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput;
use codex_arg0::Arg0DispatchPaths;
use codex_config::CloudConfigBundleLoader;
use codex_config::LoaderOverrides;
@@ -30,9 +32,14 @@ use codex_protocol::protocol::SessionSource;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use rmcp::handler::server::ServerHandler;
use rmcp::model::ListResourcesResult;
use rmcp::model::Meta;
use rmcp::model::PaginatedRequestParams;
use rmcp::model::ProtocolVersion;
use rmcp::model::RawResource;
use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use rmcp::model::Resource;
use rmcp::model::ResourceContents;
use rmcp::model::ServerCapabilities;
use rmcp::model::ServerInfo;
@@ -41,6 +48,7 @@ use rmcp::service::RoleServer;
use rmcp::transport::StreamableHttpServerConfig;
use rmcp::transport::StreamableHttpService;
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use serde_json::json;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
@@ -51,49 +59,21 @@ const TEST_RESOURCE_URI: &str = "test://codex/resource";
const TEST_BLOB_RESOURCE_URI: &str = "test://codex/resource.bin";
const TEST_RESOURCE_BLOB: &str = "YmluYXJ5LXJlc291cmNl";
const TEST_RESOURCE_TEXT: &str = "Resource body from the MCP server.";
const SKILL_NAME: &str = "demo-plugin:deploy";
const RAW_SKILL_DESCRIPTION: &str = "Deploy\nthrough the <hosted> orchestrator.";
const SKILL_DESCRIPTION: &str = "Deploy through the &lt;hosted&gt; orchestrator.";
const SKILL_RESOURCE_URI: &str = "skill://plugin_demo/deploy";
const SKILL_MAIN_PROMPT_URI: &str = "skill://plugin_demo/deploy/SKILL.md";
const SKILL_MARKER: &str = "ORCHESTRATOR_SKILL_BODY_MARKER";
const SKILL_CONTENTS: &str = "---\nname: deploy\ndescription: Deploy through the orchestrator.\n---\n\n# Deploy\n\nORCHESTRATOR_SKILL_BODY_MARKER\n";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mcp_resource_read_returns_resource_contents() -> Result<()> {
let responses_server = responses::start_mock_server().await;
let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?;
let codex_home = TempDir::new()?;
let responses_server_uri = responses_server.uri();
std::fs::write(
codex_home.path().join("config.toml"),
format!(
r#"
model = "mock-model"
approval_policy = "untrusted"
sandbox_mode = "read-only"
model_provider = "mock_provider"
chatgpt_base_url = "{apps_server_url}"
mcp_oauth_credentials_store = "file"
[features]
apps = true
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{responses_server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let (_codex_home, mut mcp) =
start_resource_test_app_server(&apps_server_url, &responses_server_uri).await?;
let thread_start_id = mcp
.send_thread_start_request(ThreadStartParams {
@@ -120,7 +100,6 @@ stream_max_retries = 0
mcp.read_stream_until_response_message(RequestId::Integer(read_request_id)),
)
.await??;
assert_eq!(
to_response::<McpResourceReadResponse>(read_response)?,
expected_resource_read_response()
@@ -131,6 +110,87 @@ stream_max_retries = 0
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn codex_apps_resources_support_orchestrator_skills_without_an_environment() -> Result<()> {
let responses_server = responses::start_mock_server().await;
let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?;
let responses_server_uri = responses_server.uri();
let (_codex_home, mut mcp) =
start_resource_test_app_server(&apps_server_url, &responses_server_uri).await?;
let thread_start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
environments: Some(Vec::new()),
..Default::default()
})
.await?;
let thread_start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response(thread_start_resp)?;
let response_mock = responses::mount_sse_once(
&responses_server,
responses::sse(vec![
responses::ev_response_created("resp-orchestrator-skill"),
responses::ev_assistant_message("msg-orchestrator-skill", "Done"),
responses::ev_completed("resp-orchestrator-skill"),
]),
)
.await;
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![UserInput::Text {
text: format!("Use ${SKILL_NAME}"),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let request = response_mock.single_request();
let developer_messages = request.message_input_texts("developer");
let catalog_line = format!("- {SKILL_NAME}: {SKILL_DESCRIPTION} (file: {SKILL_RESOURCE_URI})");
assert_eq!(
1,
developer_messages
.iter()
.filter(|text| text.contains(&catalog_line))
.count()
);
assert!(
developer_messages
.iter()
.all(|text| !text.contains("ignored-plugin:ignored"))
);
let skill_fragments = request
.message_input_texts("user")
.into_iter()
.filter(|text| text.starts_with("<skill>"))
.collect::<Vec<_>>();
assert_eq!(1, skill_fragments.len());
assert!(skill_fragments[0].contains(&format!("<name>{SKILL_NAME}</name>")));
assert!(skill_fragments[0].contains(SKILL_MARKER));
apps_server_handle.abort();
let _ = apps_server_handle.await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mcp_resource_read_returns_resource_contents_without_thread() -> Result<()> {
let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?;
@@ -246,6 +306,52 @@ async fn mcp_resource_read_returns_error_for_unknown_thread() -> Result<()> {
Ok(())
}
async fn start_resource_test_app_server(
apps_server_url: &str,
responses_server_uri: &str,
) -> Result<(TempDir, TestAppServer)> {
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join("config.toml"),
format!(
r#"
model = "mock-model"
approval_policy = "untrusted"
sandbox_mode = "read-only"
model_provider = "mock_provider"
chatgpt_base_url = "{apps_server_url}"
mcp_oauth_credentials_store = "file"
[features]
apps = true
[skills]
include_instructions = true
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{responses_server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
Ok((codex_home, mcp))
}
async fn start_resource_apps_mcp_server() -> Result<(String, JoinHandle<()>)> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
@@ -292,12 +398,69 @@ impl ServerHandler for ResourceAppsMcpServer {
.with_protocol_version(ProtocolVersion::V_2025_06_18)
}
async fn list_resources(
&self,
request: Option<PaginatedRequestParams>,
_context: RequestContext<RoleServer>,
) -> Result<ListResourcesResult, rmcp::ErrorData> {
let cursor = request.and_then(|request| request.cursor);
if cursor.is_none() {
return Ok(ListResourcesResult {
resources: vec![skill_resource(
"skill://plugin_ignored/ignored",
"plugin_ignored/ignored",
"Not an MCP skill resource.",
"text/plain",
"ignored-plugin",
"ignored",
)],
next_cursor: Some("skills-page".to_string()),
meta: None,
});
}
if cursor.as_deref() == Some("failing-page") {
return Err(rmcp::ErrorData::internal_error(
"simulated later-page failure",
/*data*/ None,
));
}
if cursor.as_deref() != Some("skills-page") {
return Err(rmcp::ErrorData::invalid_params(
"unexpected resources/list cursor",
/*data*/ None,
));
}
Ok(ListResourcesResult {
resources: vec![skill_resource(
SKILL_RESOURCE_URI,
"plugin_demo/deploy",
RAW_SKILL_DESCRIPTION,
"mcp/skill",
"demo-plugin",
"deploy",
)],
next_cursor: Some("failing-page".to_string()),
meta: None,
})
}
async fn read_resource(
&self,
request: ReadResourceRequestParams,
_context: RequestContext<RoleServer>,
) -> Result<ReadResourceResult, rmcp::ErrorData> {
let uri = request.uri;
if uri == SKILL_MAIN_PROMPT_URI {
return Ok(ReadResourceResult::new(vec![
ResourceContents::TextResourceContents {
uri: SKILL_MAIN_PROMPT_URI.to_string(),
mime_type: Some("text/markdown".to_string()),
text: SKILL_CONTENTS.to_string(),
meta: None,
},
]));
}
if uri != TEST_RESOURCE_URI {
return Err(rmcp::ErrorData::resource_not_found(
format!("resource not found: {uri}"),
@@ -321,3 +484,27 @@ impl ServerHandler for ResourceAppsMcpServer {
]))
}
}
fn skill_resource(
uri: &str,
name: &str,
description: &str,
mime_type: &str,
plugin_name: &str,
skill_name: &str,
) -> Resource {
Resource::new(
RawResource::new(uri, name)
.with_description(description)
.with_mime_type(mime_type)
.with_meta(skill_resource_meta(plugin_name, skill_name)),
/*annotations*/ None,
)
}
fn skill_resource_meta(plugin_name: &str, skill_name: &str) -> Meta {
Meta(serde_json::Map::from_iter([
("plugin_name".to_string(), json!(plugin_name)),
("skill_name".to_string(), json!(skill_name)),
]))
}
+1
View File
@@ -14,6 +14,7 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
arc-swap = { workspace = true }
async-channel = { workspace = true }
codex-async-utils = { workspace = true }
codex-api = { workspace = true }
@@ -348,6 +348,10 @@ impl McpConnectionManager {
!self.clients.is_empty()
}
pub(crate) fn contains_server(&self, server_name: &str) -> bool {
self.clients.contains_key(server_name)
}
/// Stop all MCP clients owned by this manager and terminate stdio server processes.
pub async fn shutdown(&self) {
self.startup_cancellation_token.cancel();
+4
View File
@@ -3,6 +3,9 @@ pub use connection_manager::tool_is_model_visible;
pub use elicitation::ElicitationReviewRequest;
pub use elicitation::ElicitationReviewer;
pub use elicitation::ElicitationReviewerHandle;
pub use resource_client::McpResourceClient;
pub use resource_client::McpResourcePage;
pub use resource_client::McpResourceReadResult;
pub use rmcp_client::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use runtime::McpRuntimeContext;
pub use runtime::SandboxState;
@@ -58,6 +61,7 @@ pub(crate) mod codex_apps;
pub(crate) mod connection_manager;
pub(crate) mod elicitation;
pub(crate) mod mcp;
mod resource_client;
pub(crate) mod rmcp_client;
pub(crate) mod runtime;
pub(crate) mod server;
+108
View File
@@ -0,0 +1,108 @@
use std::sync::Arc;
use anyhow::Context;
use anyhow::Result;
use arc_swap::ArcSwap;
use codex_protocol::mcp::Resource;
use codex_protocol::mcp::ResourceContent;
use rmcp::model::PaginatedRequestParams;
use rmcp::model::ReadResourceRequestParams;
use crate::McpConnectionManager;
/// One page of resources returned by an MCP server.
#[derive(Clone, Debug, PartialEq)]
pub struct McpResourcePage {
/// Resources advertised on this page.
pub resources: Vec<Resource>,
/// Opaque cursor to supply when requesting the next page.
pub next_cursor: Option<String>,
}
/// Contents returned after reading one MCP resource.
#[derive(Clone, Debug, PartialEq)]
pub struct McpResourceReadResult {
/// Text or blob content returned for the requested resource.
pub contents: Vec<ResourceContent>,
}
/// Session-scoped access to MCP resources through the currently installed manager.
///
/// The client retains the manager's shared publication handle rather than a manager
/// snapshot, so calls automatically use replacements installed during startup and refresh.
#[derive(Clone)]
pub struct McpResourceClient {
manager: Arc<ArcSwap<McpConnectionManager>>,
}
impl std::fmt::Debug for McpResourceClient {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("McpResourceClient")
.finish_non_exhaustive()
}
}
impl McpResourceClient {
/// Creates a resource client backed by the session's replaceable MCP manager.
pub fn new(manager: Arc<ArcSwap<McpConnectionManager>>) -> Self {
Self { manager }
}
/// Returns whether the current manager contains the named server.
///
/// This does not wait for server startup or imply that startup succeeded.
pub async fn has_server(&self, server: &str) -> bool {
self.manager.load_full().contains_server(server)
}
/// Lists one resource page from the named server.
pub async fn list_resources(
&self,
server: &str,
cursor: Option<String>,
) -> Result<McpResourcePage> {
let params =
cursor.map(|cursor| PaginatedRequestParams::default().with_cursor(Some(cursor)));
let result = self
.manager
.load_full()
.list_resources(server, params)
.await?;
let resources = result
.resources
.into_iter()
.map(resource_from_rmcp)
.collect::<Result<Vec<_>>>()?;
Ok(McpResourcePage {
resources,
next_cursor: result.next_cursor,
})
}
/// Reads one resource from the named server.
pub async fn read_resource(&self, server: &str, uri: &str) -> Result<McpResourceReadResult> {
let result = self
.manager
.load_full()
.read_resource(server, ReadResourceRequestParams::new(uri.to_string()))
.await?;
let contents = result
.contents
.into_iter()
.map(resource_content_from_rmcp)
.collect::<Result<Vec<_>>>()?;
Ok(McpResourceReadResult { contents })
}
}
fn resource_from_rmcp(resource: rmcp::model::Resource) -> Result<Resource> {
let value = serde_json::to_value(resource).context("failed to serialize MCP resource")?;
Resource::from_mcp_value(value).context("failed to convert MCP resource")
}
fn resource_content_from_rmcp(content: rmcp::model::ResourceContents) -> Result<ResourceContent> {
let value =
serde_json::to_value(content).context("failed to serialize MCP resource content")?;
serde_json::from_value(value).context("failed to convert MCP resource content")
}
@@ -5,12 +5,23 @@ use codex_protocol::protocol::SKILLS_INSTRUCTIONS_OPEN_TAG;
use super::ContextualUserFragment;
/// Model-context fragment describing the skills available to Codex.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct AvailableSkillsInstructions {
pub struct AvailableSkillsInstructions {
skill_root_lines: Vec<String>,
skill_lines: Vec<String>,
}
impl AvailableSkillsInstructions {
/// Creates a skills context fragment from pre-rendered catalog lines.
pub fn from_skill_lines(skill_lines: Vec<String>) -> Self {
Self {
skill_root_lines: Vec::new(),
skill_lines,
}
}
}
impl From<AvailableSkills> for AvailableSkillsInstructions {
fn from(available_skills: AvailableSkills) -> Self {
Self {
+1 -1
View File
@@ -31,7 +31,7 @@ mod user_shell_command;
pub(crate) use approved_command_prefix_saved::ApprovedCommandPrefixSaved;
pub(crate) use apps_instructions::AppsInstructions;
pub(crate) use available_plugins_instructions::AvailablePluginsInstructions;
pub(crate) use available_skills_instructions::AvailableSkillsInstructions;
pub use available_skills_instructions::AvailableSkillsInstructions;
pub(crate) use codex_context_fragments::AdditionalContextDeveloperFragment;
pub(crate) use codex_context_fragments::AdditionalContextUserFragment;
pub use codex_context_fragments::ContextualUserFragment;
+2
View File
@@ -16,6 +16,7 @@ use codex_protocol::models::is_local_image_close_tag_text;
use codex_protocol::models::is_local_image_open_tag_text;
use codex_protocol::protocol::COLLABORATION_MODE_OPEN_TAG;
use codex_protocol::protocol::REALTIME_CONVERSATION_OPEN_TAG;
use codex_protocol::protocol::SKILLS_INSTRUCTIONS_OPEN_TAG;
use codex_protocol::user_input::UserInput;
use tracing::warn;
use uuid::Uuid;
@@ -29,6 +30,7 @@ const CONTEXTUAL_DEVELOPER_PREFIXES: &[&str] = &[
"<model_switch>",
COLLABORATION_MODE_OPEN_TAG,
REALTIME_CONVERSATION_OPEN_TAG,
SKILLS_INSTRUCTIONS_OPEN_TAG,
"<personality_spec>",
"<token_budget>",
];
+10
View File
@@ -15,9 +15,19 @@ use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::ResponseItem;
use codex_protocol::models::WebSearchAction;
use codex_protocol::protocol::SKILLS_INSTRUCTIONS_OPEN_TAG;
use codex_protocol::user_input::UserInput;
use pretty_assertions::assert_eq;
#[test]
fn recognizes_skills_instructions_as_contextual_developer_content() {
assert!(is_contextual_dev_message_content(&[
ContentItem::InputText {
text: format!("{SKILLS_INSTRUCTIONS_OPEN_TAG}\n## Skills"),
},
]));
}
#[test]
fn recognizes_token_budget_as_contextual_developer_content() {
let content = vec![ContentItem::InputText {
+1
View File
@@ -65,6 +65,7 @@ use codex_login::CodexAuth;
use codex_login::auth_env_telemetry::collect_auth_env_telemetry;
use codex_login::default_client::originator;
use codex_mcp::McpConnectionManager;
use codex_mcp::McpResourceClient;
use codex_mcp::McpRuntimeContext;
use codex_mcp::codex_apps_tools_cache_key;
use codex_models_manager::manager::RefreshStrategy;
+13 -7
View File
@@ -946,8 +946,20 @@ impl Session {
.effective_agent_max_threads(MultiAgentVersion::V2)
.unwrap_or(usize::MAX),
);
// Keep one stable manager handle for the session so extension resource clients
// automatically observe the manager installed at startup and on later refreshes.
let mcp_connection_manager = Arc::new(arc_swap::ArcSwap::from_pointee(
McpConnectionManager::new_uninitialized_with_permission_profile(
&config.permissions.approval_policy,
config.permissions.permission_profile(),
config.prefix_mcp_tool_names(),
),
));
let session_extension_data =
codex_extension_api::ExtensionData::new(session_id.to_string());
session_extension_data.insert(McpResourceClient::new(Arc::clone(
&mcp_connection_manager,
)));
let thread_extension_data = codex_extension_api::ExtensionData::new_with_init(
thread_id.to_string(),
thread_extension_init,
@@ -970,13 +982,7 @@ impl Session {
// before any MCP-related events. It is reasonable to consider
// changing this to use Option or OnceCell, though the current
// setup is straightforward enough and performs well.
mcp_connection_manager: arc_swap::ArcSwap::from_pointee(
McpConnectionManager::new_uninitialized_with_permission_profile(
&config.permissions.approval_policy,
config.permissions.permission_profile(),
config.prefix_mcp_tool_names(),
),
),
mcp_connection_manager,
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
+4 -4
View File
@@ -4940,13 +4940,13 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
);
let services = SessionServices {
mcp_connection_manager: arc_swap::ArcSwap::from_pointee(
mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from_pointee(
McpConnectionManager::new_uninitialized_with_permission_profile(
&config.permissions.approval_policy,
config.permissions.permission_profile(),
config.prefix_mcp_tool_names(),
),
),
)),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
@@ -7018,13 +7018,13 @@ where
);
let services = SessionServices {
mcp_connection_manager: arc_swap::ArcSwap::from_pointee(
mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from_pointee(
McpConnectionManager::new_uninitialized_with_permission_profile(
&config.permissions.approval_policy,
config.permissions.permission_profile(),
config.prefix_mcp_tool_names(),
),
),
)),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
+1 -1
View File
@@ -40,7 +40,7 @@ use tokio_util::sync::CancellationToken;
pub(crate) struct SessionServices {
/// The latest manager; callers retain an owned handle while performing MCP I/O.
pub(crate) mcp_connection_manager: ArcSwap<McpConnectionManager>,
pub(crate) mcp_connection_manager: Arc<ArcSwap<McpConnectionManager>>,
pub(crate) mcp_startup_cancellation_token: Mutex<CancellationToken>,
pub(crate) unified_exec_manager: UnifiedExecProcessManager,
#[cfg_attr(not(unix), allow(dead_code))]
+2
View File
@@ -18,9 +18,11 @@ codex-core = { workspace = true }
codex-core-skills = { workspace = true }
codex-exec-server = { workspace = true }
codex-extension-api = { workspace = true }
codex-mcp = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-string = { workspace = true }
tokio = { workspace = true, features = ["sync", "time"] }
[dev-dependencies]
async-trait = { workspace = true }
+3 -3
View File
@@ -9,8 +9,8 @@ pub enum SkillSourceKind {
Host,
/// Skills owned by an execution environment.
Executor,
/// Skills read through an authenticated remote catalog/API.
Remote,
/// Skills owned by the orchestrator rather than an execution environment.
Orchestrator,
/// Extension-private source kind for future providers that do not fit an
/// existing transport category.
Custom(String),
@@ -25,7 +25,7 @@ impl SkillSourceKind {
match self {
Self::Host => "host",
Self::Executor => "executor",
Self::Remote => "remote",
Self::Orchestrator => "orchestrator",
Self::Custom(kind) => kind,
}
}
+52 -18
View File
@@ -17,11 +17,13 @@ use codex_extension_api::ThreadLifecycleContributor;
use codex_extension_api::ThreadStartInput;
use codex_extension_api::TurnInputContext;
use codex_extension_api::TurnInputContributor;
use codex_mcp::McpResourceClient;
use codex_protocol::capabilities::SelectedCapabilityRoot;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::WarningEvent;
use crate::catalog::SkillCatalog;
use crate::catalog::SkillCatalogEntry;
use crate::catalog::SkillReadResult;
use crate::catalog::SkillSourceKind;
@@ -84,7 +86,7 @@ impl ConfigContributor<Config> for SkillsExtension {
impl ContextContributor for SkillsExtension {
fn contribute<'a>(
&'a self,
_session_store: &'a ExtensionData,
session_store: &'a ExtensionData,
thread_store: &'a ExtensionData,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<PromptFragment>> + Send + 'a>> {
Box::pin(async move {
@@ -92,19 +94,22 @@ impl ContextContributor for SkillsExtension {
return Vec::new();
};
let config = thread_state.config();
if !config.include_instructions || thread_state.selected_roots().is_empty() {
if !config.include_instructions {
return Vec::new();
}
let catalog = self
.providers
.list_for_turn(SkillListQuery {
turn_id: thread_store.level_id().to_string(),
executor_roots: thread_state.selected_roots().to_vec(),
host: None,
include_host_skills: false,
include_bundled_skills: config.bundled_skills_enabled,
include_remote_skills: false,
})
.list_skills(
SkillListQuery {
turn_id: thread_store.level_id().to_string(),
executor_roots: thread_state.selected_roots().to_vec(),
host: None,
include_host_skills: false,
include_bundled_skills: config.bundled_skills_enabled,
include_orchestrator_skills: true,
mcp_resources: session_store.get::<McpResourceClient>(),
},
&thread_state,
)
.await;
for warning in &catalog.warnings {
self.emit_warning(thread_store.level_id(), warning.clone());
@@ -121,7 +126,7 @@ impl TurnInputContributor for SkillsExtension {
fn contribute<'a>(
&'a self,
input: TurnInputContext,
_session_store: &'a ExtensionData,
session_store: &'a ExtensionData,
thread_store: &'a ExtensionData,
turn_store: &'a ExtensionData,
) -> ExtensionFuture<'a, Vec<Box<dyn ContextualUserFragment + Send>>> {
@@ -138,9 +143,10 @@ impl TurnInputContributor for SkillsExtension {
host: host_loaded_skills.clone(),
include_host_skills: true,
include_bundled_skills: config.bundled_skills_enabled,
include_remote_skills: true,
include_orchestrator_skills: true,
mcp_resources: session_store.get::<McpResourceClient>(),
};
let catalog = self.providers.list_for_turn(query).await;
let catalog = self.list_skills(query, &thread_state).await;
for warning in &catalog.warnings {
self.emit_warning(&input.turn_id, warning.clone());
}
@@ -149,9 +155,10 @@ impl TurnInputContributor for SkillsExtension {
let mut fragments: Vec<Box<dyn ContextualUserFragment + Send>> = Vec::new();
if config.include_instructions {
let mut turn_catalog = catalog.clone();
turn_catalog
.entries
.retain(|entry| entry.authority.kind != SkillSourceKind::Executor);
turn_catalog.entries.retain(|entry| {
entry.authority.kind != SkillSourceKind::Executor
&& entry.authority.kind != SkillSourceKind::Orchestrator
});
if let Some(fragment) = available_skills_fragment(&turn_catalog) {
fragments.push(Box::new(fragment));
}
@@ -162,7 +169,7 @@ impl TurnInputContributor for SkillsExtension {
let mut injected_host_skill_prompts = InjectedHostSkillPrompts::default();
for entry in &selected_entries {
match self
.read_main_prompt(entry, host_loaded_skills.clone())
.read_main_prompt(entry, host_loaded_skills.clone(), session_store)
.await
{
Ok(read_result) => {
@@ -232,10 +239,36 @@ impl TurnInputContributor for SkillsExtension {
}
impl SkillsExtension {
async fn list_skills(
&self,
mut query: SkillListQuery,
thread_state: &SkillsThreadState,
) -> SkillCatalog {
let include_orchestrator_skills = query.include_orchestrator_skills;
let orchestrator_query = query.clone();
query.include_orchestrator_skills = false;
let mut catalog = self.providers.list_for_turn(query).await;
if include_orchestrator_skills {
match thread_state
.orchestrator_catalog_snapshot(
self.providers
.list_orchestrator_for_turn(orchestrator_query),
)
.await
{
Ok(orchestrator_catalog) => catalog.extend(orchestrator_catalog),
Err(err) => catalog.warnings.push(err.message),
}
}
catalog
}
async fn read_main_prompt(
&self,
entry: &SkillCatalogEntry,
host_loaded_skills: Option<Arc<HostLoadedSkills>>,
session_store: &ExtensionData,
) -> Result<SkillReadResult, String> {
self.providers
.read(SkillReadRequest {
@@ -243,6 +276,7 @@ impl SkillsExtension {
package: entry.id.clone(),
resource: entry.main_prompt.clone(),
host: host_loaded_skills,
mcp_resources: session_store.get::<McpResourceClient>(),
})
.await
.map_err(|err| err.message)
+1
View File
@@ -10,6 +10,7 @@ pub use extension::install;
pub use extension::install_with_providers;
pub use provider::ExecutorSkillProvider;
pub use provider::HostSkillProvider;
pub use provider::OrchestratorSkillProvider;
pub use provider::SkillProvider;
pub use sources::SkillProviderSource;
pub use sources::SkillProviders;
+6 -1
View File
@@ -4,8 +4,10 @@ use std::sync::Arc;
mod executor;
mod host;
mod orchestrator;
use codex_core_skills::HostLoadedSkills;
use codex_mcp::McpResourceClient;
use codex_protocol::capabilities::SelectedCapabilityRoot;
use crate::catalog::SkillAuthority;
@@ -18,6 +20,7 @@ use crate::catalog::SkillSearchResult;
pub use executor::ExecutorSkillProvider;
pub use host::HostSkillProvider;
pub use orchestrator::OrchestratorSkillProvider;
#[derive(Clone, Debug)]
pub struct SkillListQuery {
@@ -26,7 +29,8 @@ pub struct SkillListQuery {
pub host: Option<Arc<HostLoadedSkills>>,
pub include_host_skills: bool,
pub include_bundled_skills: bool,
pub include_remote_skills: bool,
pub include_orchestrator_skills: bool,
pub mcp_resources: Option<Arc<McpResourceClient>>,
}
#[derive(Clone, Debug)]
@@ -35,6 +39,7 @@ pub struct SkillReadRequest {
pub package: SkillPackageId,
pub resource: SkillResourceId,
pub host: Option<Arc<HostLoadedSkills>>,
pub mcp_resources: Option<Arc<McpResourceClient>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -0,0 +1,276 @@
use std::collections::HashSet;
use std::time::Duration;
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
use codex_protocol::mcp::Resource;
use codex_protocol::mcp::ResourceContent;
use crate::catalog::SkillAuthority;
use crate::catalog::SkillCatalog;
use crate::catalog::SkillCatalogEntry;
use crate::catalog::SkillPackageId;
use crate::catalog::SkillProviderError;
use crate::catalog::SkillReadResult;
use crate::catalog::SkillResourceId;
use crate::catalog::SkillSearchResult;
use crate::catalog::SkillSourceKind;
use crate::provider::SkillListQuery;
use crate::provider::SkillProvider;
use crate::provider::SkillProviderFuture;
use crate::provider::SkillReadRequest;
use crate::provider::SkillSearchRequest;
const ORCHESTRATOR_SKILL_MIME_TYPE: &str = "mcp/skill";
const ORCHESTRATOR_SKILL_DISCOVERY_TIMEOUT: Duration = Duration::from_secs(10);
const ORCHESTRATOR_SKILL_READ_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_RESOURCE_PAGES: usize = 10;
const MAX_ORCHESTRATOR_SKILLS: usize = 100;
const MAX_SKILL_NAME_CHARS: usize = 64;
const MAX_QUALIFIED_SKILL_NAME_CHARS: usize = 128;
const MAX_SKILL_DESCRIPTION_CHARS: usize = 1_024;
const MAX_SKILL_URI_CHARS: usize = 1_024;
/// Discovers and reads skills owned by the orchestrator.
///
/// The provider uses session-scoped resources without exposing the transport or
/// resource server to callers that configure the skills extension.
#[derive(Clone, Debug, Default)]
pub struct OrchestratorSkillProvider;
impl OrchestratorSkillProvider {
pub fn new() -> Self {
Self
}
}
impl SkillProvider for OrchestratorSkillProvider {
fn list(&self, query: SkillListQuery) -> SkillProviderFuture<'_, SkillCatalog> {
Box::pin(async move {
let Some(client) = query.mcp_resources else {
return Ok(SkillCatalog::default());
};
if !client.has_server(CODEX_APPS_MCP_SERVER_NAME).await {
return Ok(SkillCatalog::default());
}
let discovery_deadline =
tokio::time::Instant::now() + ORCHESTRATOR_SKILL_DISCOVERY_TIMEOUT;
let mut catalog = SkillCatalog::default();
let mut cursor = None;
let mut seen_cursors = HashSet::new();
let mut skill_resources_seen = 0usize;
let mut skipped_resources = 0usize;
let mut truncated = false;
let mut completed_pages = 0usize;
for _ in 0..MAX_RESOURCE_PAGES {
let page = match tokio::time::timeout_at(
discovery_deadline,
client.list_resources(CODEX_APPS_MCP_SERVER_NAME, cursor.clone()),
)
.await
{
Ok(result) => result.map_err(|err| {
SkillProviderError::new(format!(
"failed to list orchestrator skill resources: {err:#}"
))
}),
Err(_) => Err(SkillProviderError::new(format!(
"orchestrator skill discovery timed out after {ORCHESTRATOR_SKILL_DISCOVERY_TIMEOUT:?}"
))),
};
let result = match page {
Ok(result) => result,
Err(err) if completed_pages == 0 => return Err(err),
Err(err) => {
let page_word = if completed_pages == 1 {
"page"
} else {
"pages"
};
catalog.warnings.push(format!(
"Orchestrator skill discovery stopped after {completed_pages} resource {page_word}: {}",
err.message
));
cursor = None;
break;
}
};
completed_pages = completed_pages.saturating_add(1);
for resource in &result.resources {
if resource.mime_type.as_deref() != Some(ORCHESTRATOR_SKILL_MIME_TYPE) {
continue;
}
if skill_resources_seen >= MAX_ORCHESTRATOR_SKILLS {
truncated = true;
break;
}
skill_resources_seen = skill_resources_seen.saturating_add(1);
match catalog_entry_from_resource(resource) {
Some(entry) => catalog.push_entry(entry),
None => skipped_resources = skipped_resources.saturating_add(1),
}
}
if truncated {
break;
}
let Some(next_cursor) = result.next_cursor else {
cursor = None;
break;
};
if !seen_cursors.insert(next_cursor.clone()) {
catalog.warnings.push(
"Orchestrator skill resource pagination returned a duplicate cursor."
.to_string(),
);
cursor = None;
break;
}
cursor = Some(next_cursor);
}
if cursor.is_some() || truncated {
catalog.warnings.push(format!(
"Orchestrator skill discovery was truncated at {MAX_ORCHESTRATOR_SKILLS} skills or {MAX_RESOURCE_PAGES} resource pages."
));
}
if skipped_resources > 0 {
catalog.warnings.push(format!(
"Skipped {skipped_resources} malformed orchestrator skill resources."
));
}
Ok(catalog)
})
}
fn read(&self, request: SkillReadRequest) -> SkillProviderFuture<'_, SkillReadResult> {
Box::pin(async move {
if request.authority
!= SkillAuthority::new(SkillSourceKind::Orchestrator, CODEX_APPS_MCP_SERVER_NAME)
{
return Err(SkillProviderError::new(format!(
"orchestrator skill provider cannot read authority {}",
request.authority.id
)));
}
let expected_resource = main_prompt_uri(&request.package.0);
if request.resource.as_str() != expected_resource {
return Err(SkillProviderError::new(
"orchestrator skill resource does not match its package",
));
}
let Some(client) = request.mcp_resources.as_ref() else {
return Err(SkillProviderError::new(
"session MCP resource client is not configured",
));
};
let result = tokio::time::timeout(
ORCHESTRATOR_SKILL_READ_TIMEOUT,
client.read_resource(CODEX_APPS_MCP_SERVER_NAME, request.resource.as_str()),
)
.await
.map_err(|_| {
SkillProviderError::new(format!(
"orchestrator skill read timed out after {ORCHESTRATOR_SKILL_READ_TIMEOUT:?}"
))
})?
.map_err(|err| {
SkillProviderError::new(format!(
"failed to read orchestrator skill resource {}: {err:#}",
request.resource.as_str()
))
})?;
let contents = result
.contents
.into_iter()
.find_map(|contents| match contents {
ResourceContent::Text { uri, text, .. } if uri == request.resource.as_str() => {
Some(text)
}
ResourceContent::Text { .. } | ResourceContent::Blob { .. } => None,
});
let Some(contents) = contents else {
return Err(SkillProviderError::new(format!(
"orchestrator skill resource {} did not return matching text contents",
request.resource.as_str()
)));
};
Ok(SkillReadResult {
resource: request.resource,
contents,
})
})
}
fn search(&self, _request: SkillSearchRequest) -> SkillProviderFuture<'_, SkillSearchResult> {
Box::pin(async { Ok(SkillSearchResult::default()) })
}
}
fn catalog_entry_from_resource(resource: &Resource) -> Option<SkillCatalogEntry> {
let uri = validated_skill_uri(resource.uri.as_str())?;
let meta = resource.meta.as_ref()?.as_object()?;
let skill_name = normalized_label(meta.get("skill_name")?.as_str()?, MAX_SKILL_NAME_CHARS)?;
let name = if meta.get("source").and_then(|value| value.as_str()) == Some("user") {
skill_name
} else {
let plugin_name =
normalized_label(meta.get("plugin_name")?.as_str()?, MAX_SKILL_NAME_CHARS)?;
let qualified_name = format!("{plugin_name}:{skill_name}");
(qualified_name.chars().count() <= MAX_QUALIFIED_SKILL_NAME_CHARS)
.then_some(qualified_name)?
};
let description = normalized_description(resource.description.as_deref().unwrap_or_default())?;
let main_prompt = main_prompt_uri(uri);
Some(
SkillCatalogEntry::new(
SkillPackageId(uri.to_string()),
SkillAuthority::new(SkillSourceKind::Orchestrator, CODEX_APPS_MCP_SERVER_NAME),
name,
description,
SkillResourceId::new(main_prompt),
)
.with_display_path(uri),
)
}
fn validated_skill_uri(uri: &str) -> Option<&str> {
let path = uri.strip_prefix("skill://")?;
let invalid = path.is_empty()
|| uri.chars().count() > MAX_SKILL_URI_CHARS
|| uri
.chars()
.any(|ch| ch.is_control() || ch.is_whitespace() || matches!(ch, '<' | '>'));
(!invalid).then_some(uri)
}
fn normalized_label(value: &str, max_chars: usize) -> Option<String> {
let value = normalized_single_line(value, max_chars)?;
let invalid = value.is_empty() || value.chars().any(|ch| matches!(ch, '&' | '<' | '>'));
(!invalid).then_some(value)
}
fn normalized_description(value: &str) -> Option<String> {
normalized_single_line(value, MAX_SKILL_DESCRIPTION_CHARS).map(|value| {
value
.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
})
}
fn normalized_single_line(value: &str, max_chars: usize) -> Option<String> {
let value = value.split_whitespace().collect::<Vec<_>>().join(" ");
let valid = value.chars().count() <= max_chars && !value.chars().any(char::is_control);
valid.then_some(value)
}
fn main_prompt_uri(package_uri: &str) -> String {
format!("{}/SKILL.md", package_uri.trim_end_matches('/'))
}
+5 -31
View File
@@ -1,7 +1,4 @@
use codex_core_skills::render_available_skills_body;
use codex_extension_api::ContextualUserFragment;
use codex_protocol::protocol::SKILLS_INSTRUCTIONS_CLOSE_TAG;
use codex_protocol::protocol::SKILLS_INSTRUCTIONS_OPEN_TAG;
use codex_core::context::AvailableSkillsInstructions;
use codex_utils_string::take_bytes_at_char_boundary;
use crate::catalog::SkillCatalog;
@@ -11,30 +8,9 @@ const MAX_MAIN_PROMPT_BYTES: usize = 8_000;
pub(crate) const MAX_SKILL_NAME_BYTES: usize = 256;
pub(crate) const MAX_SKILL_PATH_BYTES: usize = 1_024;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct AvailableSkillsFragment {
body: String,
}
impl ContextualUserFragment for AvailableSkillsFragment {
fn role(&self) -> &'static str {
"developer"
}
fn markers(&self) -> (&'static str, &'static str) {
Self::type_markers()
}
fn body(&self) -> String {
self.body.clone()
}
fn type_markers() -> (&'static str, &'static str) {
(SKILLS_INSTRUCTIONS_OPEN_TAG, SKILLS_INSTRUCTIONS_CLOSE_TAG)
}
}
pub(crate) fn available_skills_fragment(catalog: &SkillCatalog) -> Option<AvailableSkillsFragment> {
pub(crate) fn available_skills_fragment(
catalog: &SkillCatalog,
) -> Option<AvailableSkillsInstructions> {
let mut total_bytes = 0usize;
let mut omitted = 0usize;
let mut skill_lines = Vec::new();
@@ -68,9 +44,7 @@ pub(crate) fn available_skills_fragment(catalog: &SkillCatalog) -> Option<Availa
));
}
Some(AvailableSkillsFragment {
body: render_available_skills_body(&[], &skill_lines),
})
Some(AvailableSkillsInstructions::from_skill_lines(skill_lines))
}
fn render_skill_line(name: &str, description: &str, path: &str) -> String {
+35 -6
View File
@@ -3,6 +3,7 @@ use std::sync::Arc;
use crate::catalog::SkillCatalog;
use crate::catalog::SkillProviderError;
use crate::catalog::SkillProviderResult;
use crate::catalog::SkillReadResult;
use crate::catalog::SkillSearchResult;
use crate::catalog::SkillSourceKind;
@@ -39,15 +40,15 @@ impl SkillProviderSource {
Self::new(SkillSourceKind::Executor, label, provider)
}
pub fn remote(label: impl Into<String>, provider: Arc<dyn SkillProvider>) -> Self {
Self::new(SkillSourceKind::Remote, label, provider)
pub fn orchestrator(label: impl Into<String>, provider: Arc<dyn SkillProvider>) -> Self {
Self::new(SkillSourceKind::Orchestrator, label, provider)
}
fn should_list(&self, query: &SkillListQuery) -> bool {
match &self.kind {
SkillSourceKind::Host => query.include_host_skills,
SkillSourceKind::Executor => !query.executor_roots.is_empty(),
SkillSourceKind::Remote => query.include_remote_skills,
SkillSourceKind::Orchestrator => query.include_orchestrator_skills,
SkillSourceKind::Custom(_) => true,
}
}
@@ -94,20 +95,48 @@ impl SkillProviders {
self
}
pub fn with_remote_provider(mut self, provider: Arc<dyn SkillProvider>) -> Self {
pub fn with_orchestrator_provider(mut self, provider: Arc<dyn SkillProvider>) -> Self {
self.sources
.push(SkillProviderSource::remote("remote", provider));
.push(SkillProviderSource::orchestrator("orchestrator", provider));
self
}
pub(crate) async fn list_for_turn(&self, query: SkillListQuery) -> SkillCatalog {
self.list_matching(&query, |source| source.should_list(&query))
.await
}
pub(crate) async fn list_orchestrator_for_turn(
&self,
query: SkillListQuery,
) -> SkillProviderResult<SkillCatalog> {
let mut catalog = SkillCatalog::default();
for source in self
.sources
.iter()
.filter(|source| source.should_list(&query))
.filter(|source| source.kind == SkillSourceKind::Orchestrator)
{
let source_catalog = source.provider.list(query.clone()).await.map_err(|err| {
SkillProviderError::new(format!(
"{} skills unavailable: {}",
source.label, err.message
))
})?;
catalog.extend(source_catalog);
}
Ok(catalog)
}
async fn list_matching(
&self,
query: &SkillListQuery,
should_list: impl Fn(&SkillProviderSource) -> bool,
) -> SkillCatalog {
let mut catalog = SkillCatalog::default();
for source in self.sources.iter().filter(|source| should_list(source)) {
extend_catalog(
&mut catalog,
source.provider.list(query.clone()).await,
+15
View File
@@ -1,9 +1,12 @@
use codex_core::config::Config;
use codex_protocol::capabilities::SelectedCapabilityRoot;
use std::future::Future;
use std::sync::Mutex;
use tokio::sync::OnceCell;
use crate::catalog::SkillCatalog;
use crate::catalog::SkillCatalogEntry;
use crate::catalog::SkillProviderError;
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct SkillsExtensionConfig {
@@ -24,6 +27,7 @@ impl SkillsExtensionConfig {
pub(crate) struct SkillsThreadState {
config: Mutex<SkillsExtensionConfig>,
selected_roots: Vec<SelectedCapabilityRoot>,
orchestrator_catalog: OnceCell<SkillCatalog>,
}
impl SkillsThreadState {
@@ -34,6 +38,7 @@ impl SkillsThreadState {
Self {
config: Mutex::new(config),
selected_roots,
orchestrator_catalog: OnceCell::new(),
}
}
@@ -54,6 +59,16 @@ impl SkillsThreadState {
pub(crate) fn selected_roots(&self) -> &[SelectedCapabilityRoot] {
&self.selected_roots
}
pub(crate) async fn orchestrator_catalog_snapshot(
&self,
initialize: impl Future<Output = Result<SkillCatalog, SkillProviderError>> + Send,
) -> Result<SkillCatalog, SkillProviderError> {
self.orchestrator_catalog
.get_or_try_init(|| initialize)
.await
.cloned()
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
@@ -224,7 +224,8 @@ async fn executor_provider_reads_from_the_environment_instance_used_for_listing(
host: None,
include_host_skills: false,
include_bundled_skills: true,
include_remote_skills: false,
include_orchestrator_skills: false,
mcp_resources: None,
})
.await
.expect("list executor skills");
@@ -246,6 +247,7 @@ async fn executor_provider_reads_from_the_environment_instance_used_for_listing(
package: entry.id,
resource: resource.clone(),
host: None,
mcp_resources: None,
})
.await
.expect("read bound executor skill"),
@@ -272,7 +274,6 @@ async fn selected_root_id_distinguishes_identical_executor_paths() {
Arc::new(EnvironmentManager::default_for_tests()),
/*restriction_product*/ None,
);
let catalog = provider
.list(SkillListQuery {
turn_id: "turn-1".to_string(),
@@ -289,7 +290,8 @@ async fn selected_root_id_distinguishes_identical_executor_paths() {
host: None,
include_host_skills: false,
include_bundled_skills: true,
include_remote_skills: false,
include_orchestrator_skills: false,
mcp_resources: None,
})
.await
.expect("list executor skills");
+85 -1
View File
@@ -24,6 +24,7 @@ use codex_skills_extension::catalog::SkillAuthority;
use codex_skills_extension::catalog::SkillCatalog;
use codex_skills_extension::catalog::SkillCatalogEntry;
use codex_skills_extension::catalog::SkillPackageId;
use codex_skills_extension::catalog::SkillProviderError;
use codex_skills_extension::catalog::SkillReadResult;
use codex_skills_extension::catalog::SkillResourceId;
use codex_skills_extension::catalog::SkillSearchResult;
@@ -144,6 +145,8 @@ async fn selected_executor_catalog_is_context_and_selected_entrypoint_is_turn_in
warnings: Vec::new(),
},
read_requests: Arc::clone(&read_requests),
list_calls: None,
fail_first_list: false,
});
let providers = SkillProviders::new().with_executor_provider(executor_provider);
let mut builder = ExtensionRegistryBuilder::new();
@@ -239,6 +242,70 @@ async fn selected_executor_catalog_is_context_and_selected_entrypoint_is_turn_in
Ok(())
}
#[tokio::test]
async fn orchestrator_catalog_snapshot_retries_failure_then_is_reused() -> TestResult {
let list_calls = Arc::new(AtomicUsize::new(0));
let providers =
SkillProviders::new().with_orchestrator_provider(Arc::new(StaticSkillProvider {
catalog: SkillCatalog {
entries: vec![test_entry(
SkillSourceKind::Orchestrator,
"codex_apps",
"orchestrator/first",
"skill://orchestrator/first/SKILL.md",
)],
warnings: Vec::new(),
},
read_requests: Arc::new(Mutex::new(Vec::new())),
list_calls: Some(Arc::clone(&list_calls)),
fail_first_list: true,
}));
let mut builder = ExtensionRegistryBuilder::new();
install_with_providers(&mut builder, providers);
let registry = builder.build();
let session_store = ExtensionData::new("session");
let thread_store = ExtensionData::new("thread");
let session_source = SessionSource::Cli;
let config = default_config().await?;
registry.thread_lifecycle_contributors()[0]
.on_thread_start(ThreadStartInput {
config: &config,
session_source: &session_source,
persistent_thread_state_available: true,
session_store: &session_store,
thread_store: &thread_store,
})
.await;
let initial_fragments = registry.context_contributors()[0]
.contribute(&session_store, &thread_store)
.await;
assert!(initial_fragments.is_empty());
for turn_id in ["turn-1", "turn-2"] {
let fragments = registry.turn_input_contributors()[0]
.contribute(
TurnInputContext {
turn_id: turn_id.to_string(),
user_input: vec![UserInput::Text {
text: "$first".to_string(),
text_elements: Vec::new(),
}],
environments: Vec::new(),
},
&session_store,
&thread_store,
&ExtensionData::new(turn_id),
)
.await;
assert_eq!(1, fragments.len());
assert!(fragments[0].render().contains("<name>first</name>"));
}
assert_eq!(2, list_calls.load(Ordering::Relaxed));
Ok(())
}
#[tokio::test]
async fn root_qualified_locator_selects_only_the_matching_executor_skill() -> TestResult {
let read_requests = Arc::new(Mutex::new(Vec::new()));
@@ -262,6 +329,8 @@ async fn root_qualified_locator_selects_only_the_matching_executor_skill() -> Te
warnings: Vec::new(),
},
read_requests: Arc::clone(&read_requests),
list_calls: None,
fail_first_list: false,
});
let providers = SkillProviders::new().with_executor_provider(executor_provider);
let mut builder = ExtensionRegistryBuilder::new();
@@ -346,6 +415,8 @@ async fn prompt_hidden_skill_can_still_be_invoked() -> TestResult {
warnings: Vec::new(),
},
read_requests: Arc::clone(&read_requests),
list_calls: None,
fail_first_list: false,
});
let providers = SkillProviders::new().with_host_provider(provider);
let mut builder = ExtensionRegistryBuilder::new();
@@ -401,12 +472,25 @@ async fn prompt_hidden_skill_can_still_be_invoked() -> TestResult {
struct StaticSkillProvider {
catalog: SkillCatalog,
read_requests: Arc<Mutex<Vec<SkillReadRequest>>>,
list_calls: Option<Arc<AtomicUsize>>,
fail_first_list: bool,
}
impl SkillProvider for StaticSkillProvider {
fn list(&self, _query: SkillListQuery) -> SkillProviderFuture<'_, SkillCatalog> {
let list_call = self
.list_calls
.as_ref()
.map(|list_calls| list_calls.fetch_add(1, Ordering::Relaxed));
let fail = self.fail_first_list && list_call == Some(0);
let catalog = self.catalog.clone();
Box::pin(async move { Ok(catalog) })
Box::pin(async move {
if fail {
Err(SkillProviderError::new("temporary orchestrator failure"))
} else {
Ok(catalog)
}
})
}
fn read(&self, request: SkillReadRequest) -> SkillProviderFuture<'_, SkillReadResult> {