diff --git a/codex-rs/core-skills/src/loader/environment.rs b/codex-rs/core-skills/src/loader/environment.rs index 93ab14e9a..96caa28ee 100644 --- a/codex-rs/core-skills/src/loader/environment.rs +++ b/codex-rs/core-skills/src/loader/environment.rs @@ -23,7 +23,6 @@ use super::ParsedSkillFrontmatter; use super::SKILLS_FILENAME; use super::SKILLS_METADATA_DIR; use super::SKILLS_METADATA_FILENAME; -use super::SkillFileDiscovery; use super::SkillMetadataFile; use super::parse_skill_frontmatter_metadata_inner; use super::resolve_dependencies; @@ -34,6 +33,24 @@ use super::validate_len; const MAX_SKILLS_ENTRIES_PER_ROOT: usize = 20_000; const MAX_CONCURRENT_SKILL_LOADS: usize = 64; +struct EnvironmentSkillDiscovery { + skills: Vec, + plugin_roots: HashSet, + namespace_roots: HashSet, + warnings: Vec, +} + +struct DiscoveredEnvironmentSkill { + path: PathUri, + metadata: SkillMetadataDiscovery, +} + +enum SkillMetadataDiscovery { + Present(PathUri), + Absent, + Probe(PathUri), +} + /// URI-native metadata for one skill owned by an execution environment. #[derive(Clone, Debug, PartialEq, Eq)] pub struct EnvironmentSkillMetadata { @@ -67,28 +84,44 @@ impl EnvironmentSkillMetadata { async fn parse( file_system: &dyn ExecutorFileSystem, - path: &PathUri, + skill: &DiscoveredEnvironmentSkill, plugin_namespace: Option<&str>, ) -> Result { - let contents = file_system - .read_file_text(path, /*sandbox*/ None) - .await - .map_err(|err| format!("failed to read file: {err}"))?; + let (contents, discovered_metadata) = match &skill.metadata { + SkillMetadataDiscovery::Present(metadata_path) => { + let (contents, metadata) = tokio::join!( + read_skill_contents(file_system, &skill.path), + read_skill_metadata(file_system, metadata_path), + ); + (contents?, metadata) + } + SkillMetadataDiscovery::Absent | SkillMetadataDiscovery::Probe(_) => ( + read_skill_contents(file_system, &skill.path).await?, + (None, None), + ), + }; let ParsedSkillFrontmatter { name: base_name, description, short_description, - } = parse_skill_frontmatter_metadata_inner(&contents, || default_skill_name(path)) + } = parse_skill_frontmatter_metadata_inner(&contents, || default_skill_name(&skill.path)) .map_err(|err| err.to_string())?; let name = plugin_namespace .map(|namespace| format!("{namespace}:{base_name}")) .unwrap_or(base_name); validate_len(&name, MAX_QUALIFIED_NAME_LEN, "qualified name") .map_err(|err| err.to_string())?; - let (dependencies, policy) = load_skill_metadata(file_system, path).await; + let (dependencies, policy) = match &skill.metadata { + SkillMetadataDiscovery::Present(_) | SkillMetadataDiscovery::Absent => { + discovered_metadata + } + SkillMetadataDiscovery::Probe(metadata_path) => { + probe_skill_metadata(file_system, metadata_path).await + } + }; Ok(Self { - path_to_skills_md: path.clone(), + path_to_skills_md: skill.path.clone(), name, description, short_description, @@ -125,6 +158,7 @@ pub async fn load_environment_skills_from_root( .await { Ok(walk) => { + let inventory_complete = !walk.truncated && walk.errors.is_empty(); let mut warnings = walk .errors .into_iter() @@ -141,10 +175,13 @@ pub async fn load_environment_skills_from_root( )); } let mut skill_files = Vec::new(); + let mut file_paths = HashSet::new(); + let mut directory_paths = HashSet::new(); let mut plugin_roots = HashSet::new(); for entry in walk.entries { match entry.kind { WalkEntryKind::Directory => { + directory_paths.insert(entry.path.clone()); if DISCOVERABLE_PLUGIN_MANIFEST_PATHS .iter() .any(|path| path.split('/').next() == entry.path.basename().as_deref()) @@ -154,40 +191,53 @@ pub async fn load_environment_skills_from_root( } } WalkEntryKind::File => { + file_paths.insert(entry.path.clone()); if entry.path.basename().as_deref() == Some(SKILLS_FILENAME) { skill_files.push(entry.path); } } } } - SkillFileDiscovery { - skill_files, + let skills = skill_files + .into_iter() + .map(|path| DiscoveredEnvironmentSkill { + metadata: discover_skill_metadata( + &path, + &file_paths, + &directory_paths, + inventory_complete, + ), + path, + }) + .collect(); + EnvironmentSkillDiscovery { + skills, plugin_roots, namespace_roots: HashSet::from([root.clone()]), warnings, } } - Err(error) if error.kind() == io::ErrorKind::NotFound => SkillFileDiscovery { - skill_files: Vec::new(), + Err(error) if error.kind() == io::ErrorKind::NotFound => EnvironmentSkillDiscovery { + skills: Vec::new(), plugin_roots: HashSet::new(), namespace_roots: HashSet::new(), warnings: Vec::new(), }, - Err(error) => SkillFileDiscovery { - skill_files: Vec::new(), + Err(error) => EnvironmentSkillDiscovery { + skills: Vec::new(), plugin_roots: HashSet::new(), namespace_roots: HashSet::new(), warnings: vec![format!("failed to walk skills root {root}: {error:#}")], }, }; outcome.warnings.extend(discovery.warnings); - if discovery.skill_files.is_empty() { + if discovery.skills.is_empty() { return outcome; } let mut skill_ancestors = HashSet::new(); - for skill_path in &discovery.skill_files { - let mut ancestor = skill_path.parent(); + for skill in &discovery.skills { + let mut ancestor = skill.path.parent(); while let Some(path) = ancestor { skill_ancestors.insert(path.clone()); ancestor = path.parent(); @@ -224,9 +274,9 @@ pub async fn load_environment_skills_from_root( // Remote executors can multiplex these independent per-skill reads, so polling a bounded // number together allows the I/O for each skill and its metadata to happen concurrently. - let skill_results = futures::stream::iter(discovery.skill_files) - .map(|path| { - let mut ancestor = path.parent(); + let skill_results = futures::stream::iter(discovery.skills) + .map(|skill| { + let mut ancestor = skill.path.parent(); let plugin_namespace = loop { let Some(current) = ancestor else { break None; @@ -236,9 +286,10 @@ pub async fn load_environment_skills_from_root( } ancestor = current.parent(); }; + let path = skill.path.clone(); async move { let result = - EnvironmentSkillMetadata::parse(file_system, &path, plugin_namespace).await; + EnvironmentSkillMetadata::parse(file_system, &skill, plugin_namespace).await; (path, result) } }) @@ -267,20 +318,48 @@ pub async fn load_environment_skills_from_root( outcome } -async fn load_skill_metadata( +fn discover_skill_metadata( + skill_path: &PathUri, + file_paths: &HashSet, + directory_paths: &HashSet, + inventory_complete: bool, +) -> SkillMetadataDiscovery { + let Some(skill_dir) = skill_path.parent() else { + return SkillMetadataDiscovery::Absent; + }; + let Ok(metadata_dir) = skill_dir.join(SKILLS_METADATA_DIR) else { + return SkillMetadataDiscovery::Absent; + }; + let Ok(metadata_path) = metadata_dir.join(SKILLS_METADATA_FILENAME) else { + return SkillMetadataDiscovery::Absent; + }; + if file_paths.contains(&metadata_path) { + SkillMetadataDiscovery::Present(metadata_path) + } else if inventory_complete && !directory_paths.contains(&metadata_dir) { + SkillMetadataDiscovery::Absent + } else { + // The walk can omit entries after an error or traversal limit. It also omits file + // symlinks, so keep the existing probe when the metadata directory itself was observed. + SkillMetadataDiscovery::Probe(metadata_path) + } +} + +async fn read_skill_contents( file_system: &dyn ExecutorFileSystem, skill_path: &PathUri, +) -> Result { + file_system + .read_file_text(skill_path, /*sandbox*/ None) + .await + .map_err(|err| format!("failed to read file: {err}")) +} + +async fn probe_skill_metadata( + file_system: &dyn ExecutorFileSystem, + metadata_path: &PathUri, ) -> (Option, Option) { - let Some(skill_dir) = skill_path.parent() else { - return (None, None); - }; - let Ok(metadata_path) = - skill_dir.join(&format!("{SKILLS_METADATA_DIR}/{SKILLS_METADATA_FILENAME}")) - else { - return (None, None); - }; match file_system - .get_metadata(&metadata_path, /*sandbox*/ None) + .get_metadata(metadata_path, /*sandbox*/ None) .await { Ok(metadata) if metadata.is_file => {} @@ -291,8 +370,15 @@ async fn load_skill_metadata( return (None, None); } } + read_skill_metadata(file_system, metadata_path).await +} + +async fn read_skill_metadata( + file_system: &dyn ExecutorFileSystem, + metadata_path: &PathUri, +) -> (Option, Option) { let contents = match file_system - .read_file_text(&metadata_path, /*sandbox*/ None) + .read_file_text(metadata_path, /*sandbox*/ None) .await { Ok(contents) => contents, diff --git a/codex-rs/core-skills/tests/environment_loader.rs b/codex-rs/core-skills/tests/environment_loader.rs index 42d244608..c17802bed 100644 --- a/codex-rs/core-skills/tests/environment_loader.rs +++ b/codex-rs/core-skills/tests/environment_loader.rs @@ -1,5 +1,7 @@ use std::fs; use std::sync::Mutex; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use codex_core_skills::loader::EnvironmentSkillMetadata; use codex_core_skills::loader::load_environment_skills_from_root; @@ -13,6 +15,8 @@ use codex_exec_server::FileSystemSandboxContext; use codex_exec_server::LOCAL_FS; use codex_exec_server::ReadDirectoryEntry; use codex_exec_server::RemoveOptions; +use codex_exec_server::WalkOptions; +use codex_exec_server::WalkOutcome; use codex_utils_path_uri::PathUri; use pretty_assertions::assert_eq; use tempfile::tempdir; @@ -20,6 +24,15 @@ use tempfile::tempdir; struct RecordingFileSystem<'a> { inner: &'a dyn ExecutorFileSystem, read_files: Mutex>, + metadata_files: Mutex>, + walks: AtomicUsize, +} + +#[derive(Debug, PartialEq, Eq)] +struct FileSystemCalls { + walks: usize, + read_files: Vec, + metadata_files: Vec, } impl<'a> RecordingFileSystem<'a> { @@ -27,14 +40,29 @@ impl<'a> RecordingFileSystem<'a> { Self { inner, read_files: Mutex::new(Vec::new()), + metadata_files: Mutex::new(Vec::new()), + walks: AtomicUsize::new(0), } } - fn read_files(&self) -> Vec { - self.read_files + fn calls(&self) -> FileSystemCalls { + let mut read_files = self + .read_files .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) - .clone() + .clone(); + read_files.sort_by_key(ToString::to_string); + let mut metadata_files = self + .metadata_files + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone(); + metadata_files.sort_by_key(ToString::to_string); + FileSystemCalls { + walks: self.walks.load(Ordering::Relaxed), + read_files, + metadata_files, + } } } @@ -90,6 +118,10 @@ impl ExecutorFileSystem for RecordingFileSystem<'_> { path: &'a PathUri, sandbox: Option<&'a FileSystemSandboxContext>, ) -> ExecutorFileSystemFuture<'a, FileMetadata> { + self.metadata_files + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(path.clone()); self.inner.get_metadata(path, sandbox) } @@ -101,6 +133,16 @@ impl ExecutorFileSystem for RecordingFileSystem<'_> { self.inner.read_directory(path, sandbox) } + fn walk<'a>( + &'a self, + path: &'a PathUri, + options: WalkOptions, + sandbox: Option<&'a FileSystemSandboxContext>, + ) -> ExecutorFileSystemFuture<'a, WalkOutcome> { + self.walks.fetch_add(1, Ordering::Relaxed); + self.inner.walk(path, options, sandbox) + } + fn remove<'a>( &'a self, path: &'a PathUri, @@ -200,7 +242,8 @@ async fn loads_nearest_plugin_namespaces_without_reading_unused_sibling_manifest ); let mut manifest_reads = file_system - .read_files() + .calls() + .read_files .into_iter() .filter(|path| path.basename().as_deref() == Some("plugin.json")) .collect::>(); @@ -214,3 +257,73 @@ async fn loads_nearest_plugin_namespaces_without_reading_unused_sibling_manifest expected_manifest_reads.sort_by_key(ToString::to_string); assert_eq!(manifest_reads, expected_manifest_reads); } + +#[tokio::test] +async fn reuses_walk_inventory_for_missing_skill_metadata() { + const SKILL_COUNT: usize = 66; + + let root = tempdir().expect("tempdir"); + let manifest_path = root.path().join(".codex-plugin/plugin.json"); + fs::create_dir_all(manifest_path.parent().expect("manifest parent")).expect("manifest dir"); + fs::write(&manifest_path, r#"{"name":"inventory"}"#).expect("manifest"); + + let mut skill_paths = Vec::new(); + for index in 0..SKILL_COUNT { + let name = format!("skill-{index}"); + let skill_path = root.path().join(&name).join("SKILL.md"); + fs::create_dir_all(skill_path.parent().expect("skill parent")).expect("skill dir"); + fs::write( + &skill_path, + format!("---\nname: {name}\ndescription: {name} skill.\n---\n"), + ) + .expect("skill"); + skill_paths.push(skill_path); + } + + let file_system = RecordingFileSystem::new(LOCAL_FS.as_ref()); + let root_uri = PathUri::from_host_native_path(root.path()).expect("root URI"); + let outcome = load_environment_skills_from_root( + &file_system, + &root_uri, + /*restriction_product*/ None, + ) + .await; + + let mut expected_skills = skill_paths + .iter() + .enumerate() + .map(|(index, skill_path)| EnvironmentSkillMetadata { + path_to_skills_md: PathUri::from_host_native_path(skill_path).unwrap(), + name: format!("inventory:skill-{index}"), + description: format!("skill-{index} skill."), + short_description: None, + dependencies: None, + policy: None, + }) + .collect::>(); + expected_skills.sort_by(|left, right| { + left.name.cmp(&right.name).then_with(|| { + left.path_to_skills_md + .to_string() + .cmp(&right.path_to_skills_md.to_string()) + }) + }); + assert_eq!(outcome.skills, expected_skills); + assert_eq!(outcome.warnings, Vec::::new()); + + let mut expected_read_files = skill_paths + .iter() + .map(|path| PathUri::from_host_native_path(path).unwrap()) + .collect::>(); + let manifest_uri = PathUri::from_host_native_path(manifest_path).unwrap(); + expected_read_files.push(manifest_uri.clone()); + expected_read_files.sort_by_key(ToString::to_string); + assert_eq!( + file_system.calls(), + FileSystemCalls { + walks: 1, + read_files: expected_read_files, + metadata_files: vec![manifest_uri], + } + ); +}