mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
skills: hide orchestrator skills with a local executor (#28333)
## Why App-server threads without a local executor need orchestrator-owned skills from the hosted `codex_apps` MCP server. Threads with the local executor already discover installed skills from the local filesystem. After the orchestrator skill provider was enabled for every app-server thread, local-executor threads also received the hosted skill catalog and the `skills.list` and `skills.read` tools. This changed the existing local behavior and could expose a second hosted copy of a skill that was already installed locally. ## What changed - Expose the thread's selected execution environments to extensions at thread startup. - Enable orchestrator skills only when the reserved local environment is not selected. - Apply that decision consistently to hosted skill catalog discovery, explicit skill injection, and the `skills.list` and `skills.read` tools. ## Verification - The existing no-executor app-server test continues to verify hosted skill discovery, invocation, and child-resource reads. - A new app-server test verifies that local-executor threads do not receive hosted skill context or `skills.*` tools.
This commit is contained in:
@@ -281,6 +281,78 @@ async fn orchestrator_skill_can_read_referenced_resource_without_an_executor() -
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn local_executor_does_not_expose_orchestrator_skills() -> Result<()> {
|
||||
let responses_server = responses::start_mock_server().await;
|
||||
let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?;
|
||||
let responses_server_uri = responses_server.uri();
|
||||
let (_codex_home, mut mcp) =
|
||||
start_resource_test_app_server(&apps_server_url, &responses_server_uri).await?;
|
||||
|
||||
let thread_start_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response(thread_start_resp)?;
|
||||
|
||||
let response_mock = responses::mount_sse_once(
|
||||
&responses_server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-no-orchestrator-skill"),
|
||||
responses::ev_assistant_message("msg-no-orchestrator-skill", "Done"),
|
||||
responses::ev_completed("resp-no-orchestrator-skill"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let turn_start_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id,
|
||||
input: vec![UserInput::Text {
|
||||
text: format!("Use ${SKILL_NAME}"),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
|
||||
)
|
||||
.await??;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let request = response_mock.single_request();
|
||||
assert!(request.tool_by_name("skills", "list").is_none());
|
||||
assert!(request.tool_by_name("skills", "read").is_none());
|
||||
assert!(
|
||||
request
|
||||
.message_input_texts("developer")
|
||||
.iter()
|
||||
.all(|text| !text.contains(SKILL_NAME))
|
||||
);
|
||||
assert!(
|
||||
request
|
||||
.message_input_texts("user")
|
||||
.iter()
|
||||
.all(|text| !text.contains(SKILL_MARKER))
|
||||
);
|
||||
|
||||
apps_server_handle.abort();
|
||||
let _ = apps_server_handle.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn mcp_resource_read_returns_resource_contents_without_thread() -> Result<()> {
|
||||
let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?;
|
||||
|
||||
@@ -969,6 +969,7 @@ impl Session {
|
||||
config: config.as_ref(),
|
||||
session_source: &session_configuration.session_source,
|
||||
persistent_thread_state_available: state_db_ctx.is_some(),
|
||||
environments: session_configuration.environment_selections(),
|
||||
session_store: &session_extension_data,
|
||||
thread_store: &thread_extension_data,
|
||||
}).await;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::ExtensionData;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TurnEnvironmentSelection;
|
||||
|
||||
/// Input supplied when the host starts a runtime for a thread.
|
||||
pub struct ThreadStartInput<'a, C> {
|
||||
@@ -9,6 +10,8 @@ pub struct ThreadStartInput<'a, C> {
|
||||
pub session_source: &'a SessionSource,
|
||||
/// Whether persistent thread-scoped state is available for this thread.
|
||||
pub persistent_thread_state_available: bool,
|
||||
/// Execution environments selected for this thread.
|
||||
pub environments: &'a [TurnEnvironmentSelection],
|
||||
/// Store scoped to the host session runtime.
|
||||
pub session_store: &'a ExtensionData,
|
||||
/// Store scoped to this thread runtime.
|
||||
|
||||
@@ -1130,6 +1130,7 @@ async fn installed_tools_with_start(
|
||||
config: &(),
|
||||
session_source: &session_source,
|
||||
persistent_thread_state_available,
|
||||
environments: &[],
|
||||
session_store: &session_store,
|
||||
thread_store: &thread_store,
|
||||
})
|
||||
@@ -1182,6 +1183,7 @@ impl GoalExtensionHarness {
|
||||
config: &(),
|
||||
session_source: &session_source,
|
||||
persistent_thread_state_available: true,
|
||||
environments: &[],
|
||||
session_store: &session_store,
|
||||
thread_store: &thread_store,
|
||||
})
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::sync::Arc;
|
||||
|
||||
use codex_core_skills::HostLoadedSkills;
|
||||
use codex_core_skills::injection::InjectedHostSkillPrompts;
|
||||
use codex_exec_server::LOCAL_ENVIRONMENT_ID;
|
||||
use codex_extension_api::ConfigContributor;
|
||||
use codex_extension_api::ContextContributor;
|
||||
use codex_extension_api::ContextualUserFragment;
|
||||
@@ -60,9 +61,14 @@ where
|
||||
.get::<Vec<SelectedCapabilityRoot>>()
|
||||
.map(|selected_roots| selected_roots.as_ref().clone())
|
||||
.unwrap_or_default();
|
||||
let orchestrator_skills_enabled = !input
|
||||
.environments
|
||||
.iter()
|
||||
.any(|environment| environment.environment_id == LOCAL_ENVIRONMENT_ID);
|
||||
input.thread_store.insert(SkillsThreadState::new(
|
||||
(self.config_from_host)(input.config),
|
||||
selected_roots,
|
||||
orchestrator_skills_enabled,
|
||||
));
|
||||
})
|
||||
}
|
||||
@@ -83,7 +89,12 @@ where
|
||||
if let Some(state) = thread_store.get::<SkillsThreadState>() {
|
||||
state.set_config(next_config);
|
||||
} else {
|
||||
thread_store.insert(SkillsThreadState::new(next_config, Vec::new()));
|
||||
let orchestrator_skills_enabled = true;
|
||||
thread_store.insert(SkillsThreadState::new(
|
||||
next_config,
|
||||
Vec::new(),
|
||||
orchestrator_skills_enabled,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -113,7 +124,7 @@ where
|
||||
host: None,
|
||||
include_host_skills: false,
|
||||
include_bundled_skills: config.bundled_skills_enabled,
|
||||
include_orchestrator_skills: true,
|
||||
include_orchestrator_skills: thread_state.orchestrator_skills_enabled(),
|
||||
mcp_resources: session_store.get::<McpResourceClient>(),
|
||||
},
|
||||
&thread_state,
|
||||
@@ -137,9 +148,13 @@ where
|
||||
fn tools(
|
||||
&self,
|
||||
session_store: &ExtensionData,
|
||||
_thread_store: &ExtensionData,
|
||||
thread_store: &ExtensionData,
|
||||
) -> Vec<Arc<dyn ToolExecutor<ToolCall>>> {
|
||||
if !self.providers.has_orchestrator_provider() {
|
||||
if !self.providers.has_orchestrator_provider()
|
||||
|| !thread_store
|
||||
.get::<SkillsThreadState>()
|
||||
.is_some_and(|state| state.orchestrator_skills_enabled())
|
||||
{
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
@@ -174,7 +189,7 @@ where
|
||||
host: host_loaded_skills.clone(),
|
||||
include_host_skills: true,
|
||||
include_bundled_skills: config.bundled_skills_enabled,
|
||||
include_orchestrator_skills: true,
|
||||
include_orchestrator_skills: thread_state.orchestrator_skills_enabled(),
|
||||
mcp_resources: session_store.get::<McpResourceClient>(),
|
||||
};
|
||||
let catalog = self.list_skills(query, &thread_state).await;
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::catalog::SkillProviderError;
|
||||
pub(crate) struct SkillsThreadState {
|
||||
config: Mutex<SkillsExtensionConfig>,
|
||||
selected_roots: Vec<SelectedCapabilityRoot>,
|
||||
orchestrator_skills_enabled: bool,
|
||||
orchestrator_catalog: OnceCell<SkillCatalog>,
|
||||
}
|
||||
|
||||
@@ -19,10 +20,12 @@ impl SkillsThreadState {
|
||||
pub(crate) fn new(
|
||||
config: SkillsExtensionConfig,
|
||||
selected_roots: Vec<SelectedCapabilityRoot>,
|
||||
orchestrator_skills_enabled: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
config: Mutex::new(config),
|
||||
selected_roots,
|
||||
orchestrator_skills_enabled,
|
||||
orchestrator_catalog: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
@@ -45,6 +48,10 @@ impl SkillsThreadState {
|
||||
&self.selected_roots
|
||||
}
|
||||
|
||||
pub(crate) fn orchestrator_skills_enabled(&self) -> bool {
|
||||
self.orchestrator_skills_enabled
|
||||
}
|
||||
|
||||
pub(crate) async fn orchestrator_catalog_snapshot(
|
||||
&self,
|
||||
initialize: impl Future<Output = Result<SkillCatalog, SkillProviderError>> + Send,
|
||||
|
||||
@@ -74,6 +74,7 @@ async fn installed_extension_uses_host_loaded_skills() -> TestResult {
|
||||
config: &config,
|
||||
session_source: &session_source,
|
||||
persistent_thread_state_available: true,
|
||||
environments: &[],
|
||||
session_store: &session_store,
|
||||
thread_store: &thread_store,
|
||||
})
|
||||
@@ -175,6 +176,7 @@ async fn selected_executor_catalog_is_context_and_selected_entrypoint_is_turn_in
|
||||
config: &config,
|
||||
session_source: &session_source,
|
||||
persistent_thread_state_available: true,
|
||||
environments: &[],
|
||||
session_store: &session_store,
|
||||
thread_store: &thread_store,
|
||||
})
|
||||
@@ -285,6 +287,7 @@ async fn orchestrator_catalog_snapshot_caches_failure() -> TestResult {
|
||||
config: &config,
|
||||
session_source: &session_source,
|
||||
persistent_thread_state_available: true,
|
||||
environments: &[],
|
||||
session_store: &session_store,
|
||||
thread_store: &thread_store,
|
||||
})
|
||||
@@ -376,6 +379,7 @@ async fn root_qualified_locator_selects_only_the_matching_executor_skill() -> Te
|
||||
config: &config,
|
||||
session_source: &session_source,
|
||||
persistent_thread_state_available: true,
|
||||
environments: &[],
|
||||
session_store: &session_store,
|
||||
thread_store: &thread_store,
|
||||
})
|
||||
@@ -450,6 +454,7 @@ async fn prompt_hidden_skill_can_still_be_invoked() -> TestResult {
|
||||
config: &config,
|
||||
session_source: &session_source,
|
||||
persistent_thread_state_available: true,
|
||||
environments: &[],
|
||||
session_store: &session_store,
|
||||
thread_store: &thread_store,
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user