Overlap executor skill reads with namespace discovery (#30225)

## Why

Environment skill discovery needs two independent pieces of information:

- plugin namespaces from `plugin.json` files; and
- skill metadata from each `SKILL.md` file.

Today these happen in sequence. Codex waits for every plugin namespace
lookup to finish before it starts reading any skill files. On a remote
executor, that creates an avoidable network-latency barrier.

```text
before: walk -> namespace lookups -> skill reads -> build catalog
after:  walk -> namespace lookups ─┐
             -> skill reads ───────┴-> build catalog
```

## What changes

- Read and parse skill files without waiting for plugin namespace
discovery.
- Resolve root and nested plugin namespaces concurrently.
- Join both results only when constructing the final qualified skill
names.
- Keep the existing 64-skill concurrency bound, output ordering,
warnings, metadata behavior, and namespace rules.

## Testing

The regression test makes plugin manifest lookup wait until a `SKILL.md`
read has started. The old serialized pipeline would time out; the new
pipeline completes and still returns the correctly namespaced skill.

`just test -p codex-core-skills` passes all 111 tests.

## Out of scope

This does not add an exec-server endpoint, batch filesystem calls, or
reduce the number of files transferred. A frontmatter-only read or
server-side skill catalog can remain a separate follow-up if benchmarks
show that transferred bytes are the next bottleneck.
This commit is contained in:
jif
2026-06-26 19:37:59 +01:00
committed by GitHub
Unverified
parent d9cf931d0e
commit a938d5f607
2 changed files with 158 additions and 49 deletions
+71 -46
View File
@@ -45,6 +45,15 @@ struct DiscoveredEnvironmentSkill {
metadata: SkillMetadataDiscovery,
}
struct ParsedEnvironmentSkill {
path_to_skills_md: PathUri,
base_name: String,
description: String,
short_description: Option<String>,
dependencies: Option<SkillDependencies>,
policy: Option<SkillPolicy>,
}
enum SkillMetadataDiscovery {
Present(PathUri),
Absent,
@@ -81,11 +90,12 @@ impl EnvironmentSkillMetadata {
None => true,
}
}
}
async fn parse(
impl ParsedEnvironmentSkill {
async fn load(
file_system: &dyn ExecutorFileSystem,
skill: &DiscoveredEnvironmentSkill,
plugin_namespace: Option<&str>,
) -> Result<Self, String> {
let (contents, discovered_metadata) = match &skill.metadata {
SkillMetadataDiscovery::Present(metadata_path) => {
@@ -106,11 +116,6 @@ impl EnvironmentSkillMetadata {
short_description,
} = parse_skill_frontmatter_metadata_inner(&contents, || default_skill_name(&skill.path))
.map_err(|err| err.to_string())?;
let name = plugin_namespace
.map(|namespace| format!("{namespace}:{base_name}"))
.unwrap_or(base_name);
validate_len(&name, MAX_QUALIFIED_NAME_LEN, "qualified name")
.map_err(|err| err.to_string())?;
let (dependencies, policy) = match &skill.metadata {
SkillMetadataDiscovery::Present(_) | SkillMetadataDiscovery::Absent => {
discovered_metadata
@@ -122,7 +127,7 @@ impl EnvironmentSkillMetadata {
Ok(Self {
path_to_skills_md: skill.path.clone(),
name,
base_name,
description,
short_description,
dependencies,
@@ -245,38 +250,55 @@ pub async fn load_environment_skills_from_root(
}
let namespace_roots = discovery.namespace_roots;
let namespace_lookups = join_all(namespace_roots.iter().map(|namespace_root| async {
(
namespace_root.clone(),
plugin_namespace_for_skill_uri(file_system, namespace_root).await,
)
}))
.await;
let plugin_lookups = join_all(
discovery
.plugin_roots
.iter()
.filter(|plugin_root| skill_ancestors.contains(*plugin_root))
.filter(|plugin_root| !namespace_roots.contains(*plugin_root))
.map(|plugin_root| async {
(
plugin_root.clone(),
plugin_namespace_for_root_uri(file_system, plugin_root).await,
)
}),
)
.await;
let plugin_namespaces = namespace_lookups
.into_iter()
.chain(plugin_lookups)
.filter_map(|(plugin_root, namespace)| namespace.map(|namespace| (plugin_root, namespace)))
.collect::<HashMap<_, _>>();
let plugin_namespaces = async {
let namespace_lookups = join_all(namespace_roots.iter().map(|namespace_root| async {
(
namespace_root.clone(),
plugin_namespace_for_skill_uri(file_system, namespace_root).await,
)
}));
let plugin_lookups = join_all(
discovery
.plugin_roots
.iter()
.filter(|plugin_root| skill_ancestors.contains(*plugin_root))
.filter(|plugin_root| !namespace_roots.contains(*plugin_root))
.map(|plugin_root| async {
(
plugin_root.clone(),
plugin_namespace_for_root_uri(file_system, plugin_root).await,
)
}),
);
let (namespace_lookups, plugin_lookups) = tokio::join!(namespace_lookups, plugin_lookups);
namespace_lookups
.into_iter()
.chain(plugin_lookups)
.filter_map(|(plugin_root, namespace)| {
namespace.map(|namespace| (plugin_root, namespace))
})
.collect::<HashMap<_, _>>()
};
// Remote executors can multiplex these independent per-skill reads, so polling a bounded
// number together allows the I/O for each skill and its metadata to happen concurrently.
let skill_results = futures::stream::iter(discovery.skills)
.map(|skill| {
let mut ancestor = skill.path.parent();
let path = skill.path.clone();
async move {
(
path,
ParsedEnvironmentSkill::load(file_system, &skill).await,
)
}
})
.buffered(MAX_CONCURRENT_SKILL_LOADS)
.collect::<Vec<_>>();
let (plugin_namespaces, skill_results) = tokio::join!(plugin_namespaces, skill_results);
for (path, result) in skill_results {
let result = result.and_then(|skill| {
let mut ancestor = skill.path_to_skills_md.parent();
let plugin_namespace = loop {
let Some(current) = ancestor else {
break None;
@@ -286,18 +308,21 @@ pub async fn load_environment_skills_from_root(
}
ancestor = current.parent();
};
let path = skill.path.clone();
async move {
let result =
EnvironmentSkillMetadata::parse(file_system, &skill, plugin_namespace).await;
(path, result)
}
})
.buffered(MAX_CONCURRENT_SKILL_LOADS)
.collect::<Vec<_>>()
.await;
let name = plugin_namespace
.map(|namespace| format!("{namespace}:{}", skill.base_name))
.unwrap_or(skill.base_name);
validate_len(&name, MAX_QUALIFIED_NAME_LEN, "qualified name")
.map_err(|err| err.to_string())?;
for (path, result) in skill_results {
Ok(EnvironmentSkillMetadata {
path_to_skills_md: skill.path_to_skills_md,
name,
description: skill.description,
short_description: skill.short_description,
dependencies: skill.dependencies,
policy: skill.policy,
})
});
match result {
Ok(skill) if skill.matches_product_restriction(restriction_product) => {
outcome.skills.push(skill);
@@ -1,7 +1,9 @@
use std::fs;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_core_skills::loader::EnvironmentSkillMetadata;
use codex_core_skills::loader::load_environment_skills_from_root;
@@ -20,12 +22,22 @@ use codex_exec_server::WalkOutcome;
use codex_utils_path_uri::PathUri;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
use tokio::sync::Notify;
#[derive(Clone, Copy)]
enum ManifestMetadataBehavior {
Immediate,
WaitForSkillRead,
}
struct RecordingFileSystem<'a> {
inner: &'a dyn ExecutorFileSystem,
read_files: Mutex<Vec<PathUri>>,
metadata_files: Mutex<Vec<PathUri>>,
walks: AtomicUsize,
manifest_metadata_behavior: ManifestMetadataBehavior,
skill_read_started: AtomicBool,
skill_read_started_notify: Notify,
}
#[derive(Debug, PartialEq, Eq)]
@@ -36,12 +48,18 @@ struct FileSystemCalls {
}
impl<'a> RecordingFileSystem<'a> {
fn new(inner: &'a dyn ExecutorFileSystem) -> Self {
fn new(
inner: &'a dyn ExecutorFileSystem,
manifest_metadata_behavior: ManifestMetadataBehavior,
) -> Self {
Self {
inner,
read_files: Mutex::new(Vec::new()),
metadata_files: Mutex::new(Vec::new()),
walks: AtomicUsize::new(0),
manifest_metadata_behavior,
skill_read_started: AtomicBool::new(false),
skill_read_started_notify: Notify::new(),
}
}
@@ -84,6 +102,10 @@ impl ExecutorFileSystem for RecordingFileSystem<'_> {
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(path.clone());
if path.basename().as_deref() == Some("SKILL.md") {
self.skill_read_started.store(true, Ordering::Release);
self.skill_read_started_notify.notify_waiters();
}
self.inner.read_file(path, sandbox)
}
@@ -122,6 +144,22 @@ impl ExecutorFileSystem for RecordingFileSystem<'_> {
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(path.clone());
if matches!(
self.manifest_metadata_behavior,
ManifestMetadataBehavior::WaitForSkillRead
) && path.basename().as_deref() == Some("plugin.json")
{
return Box::pin(async move {
loop {
let notified = self.skill_read_started_notify.notified();
if self.skill_read_started.load(Ordering::Acquire) {
break;
}
notified.await;
}
self.inner.get_metadata(path, sandbox).await
});
}
self.inner.get_metadata(path, sandbox)
}
@@ -201,7 +239,8 @@ async fn loads_nearest_plugin_namespaces_without_reading_unused_sibling_manifest
.expect("skill");
}
let file_system = RecordingFileSystem::new(LOCAL_FS.as_ref());
let file_system =
RecordingFileSystem::new(LOCAL_FS.as_ref(), ManifestMetadataBehavior::Immediate);
let root_uri = PathUri::from_host_native_path(root.path()).expect("root URI");
let outcome = load_environment_skills_from_root(
&file_system,
@@ -280,7 +319,8 @@ async fn reuses_walk_inventory_for_missing_skill_metadata() {
skill_paths.push(skill_path);
}
let file_system = RecordingFileSystem::new(LOCAL_FS.as_ref());
let file_system =
RecordingFileSystem::new(LOCAL_FS.as_ref(), ManifestMetadataBehavior::Immediate);
let root_uri = PathUri::from_host_native_path(root.path()).expect("root URI");
let outcome = load_environment_skills_from_root(
&file_system,
@@ -327,3 +367,47 @@ async fn reuses_walk_inventory_for_missing_skill_metadata() {
}
);
}
#[tokio::test]
async fn reads_skill_files_while_resolving_plugin_namespaces() {
let root = tempdir().expect("tempdir");
let manifest_path = root.path().join(".codex-plugin/plugin.json");
fs::create_dir_all(manifest_path.parent().expect("manifest parent")).expect("manifest dir");
fs::write(&manifest_path, r#"{"name":"parallel"}"#).expect("manifest");
let skill_path = root.path().join("demo/SKILL.md");
fs::create_dir_all(skill_path.parent().expect("skill parent")).expect("skill dir");
fs::write(
&skill_path,
"---\nname: demo\ndescription: demo skill.\n---\n",
)
.expect("skill");
let file_system = RecordingFileSystem::new(
LOCAL_FS.as_ref(),
ManifestMetadataBehavior::WaitForSkillRead,
);
let root_uri = PathUri::from_host_native_path(root.path()).expect("root URI");
let outcome = tokio::time::timeout(
Duration::from_secs(5),
load_environment_skills_from_root(
&file_system,
&root_uri,
/*restriction_product*/ None,
),
)
.await
.expect("skill reads should start before namespace resolution finishes");
assert_eq!(outcome.warnings, Vec::<String>::new());
assert_eq!(
outcome.skills,
vec![EnvironmentSkillMetadata {
path_to_skills_md: PathUri::from_host_native_path(skill_path).unwrap(),
name: "parallel:demo".to_string(),
description: "demo skill.".to_string(),
short_description: None,
dependencies: None,
policy: None,
}]
);
}