diff --git a/codex-rs/core/src/session/world_state.rs b/codex-rs/core/src/session/world_state.rs index 254197225..10d6a5d0d 100644 --- a/codex-rs/core/src/session/world_state.rs +++ b/codex-rs/core/src/session/world_state.rs @@ -36,12 +36,18 @@ impl Session { ); } let environments = step_context.environments.to_selections(); + let ready_selected_capability_roots = step_context + .selected_capability_roots + .iter() + .map(|root| root.selected_root().clone()) + .collect::>(); for contributor in self.services.extensions.context_contributors() { for section in contributor .contribute_world_state(WorldStateContributionInput { thread_id: self.thread_id(), turn_id: turn_context.sub_id.as_str(), environments: &environments, + ready_selected_capability_roots: &ready_selected_capability_roots, session_store: &self.services.session_extension_data, thread_store: &self.services.thread_extension_data, turn_store: turn_context.extension_data.as_ref(), diff --git a/codex-rs/ext/extension-api/src/contributors/world_state.rs b/codex-rs/ext/extension-api/src/contributors/world_state.rs index b5bb54810..23f180826 100644 --- a/codex-rs/ext/extension-api/src/contributors/world_state.rs +++ b/codex-rs/ext/extension-api/src/contributors/world_state.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use codex_protocol::ThreadId; +use codex_protocol::capabilities::SelectedCapabilityRoot; use codex_protocol::protocol::TurnEnvironmentSelection; use serde_json::Value; @@ -11,6 +12,8 @@ pub struct WorldStateContributionInput<'a> { pub thread_id: ThreadId, pub turn_id: &'a str, pub environments: &'a [TurnEnvironmentSelection], + /// Selected roots whose stable environments are ready in this sampling step. + pub ready_selected_capability_roots: &'a [SelectedCapabilityRoot], pub session_store: &'a ExtensionData, pub thread_store: &'a ExtensionData, pub turn_store: &'a ExtensionData, diff --git a/codex-rs/ext/skills/src/extension.rs b/codex-rs/ext/skills/src/extension.rs index de9facd8c..0b582e938 100644 --- a/codex-rs/ext/skills/src/extension.rs +++ b/codex-rs/ext/skills/src/extension.rs @@ -18,8 +18,9 @@ use codex_extension_api::ToolContributor; use codex_extension_api::ToolExecutor; use codex_extension_api::TurnInputContext; use codex_extension_api::TurnInputContributor; +use codex_extension_api::WorldStateContributionInput; +use codex_extension_api::WorldStateSectionContribution; use codex_mcp::McpResourceClient; -use codex_protocol::capabilities::SelectedCapabilityRoot; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::WarningEvent; @@ -40,9 +41,11 @@ use crate::render::truncate_main_prompt_contents; use crate::render::truncate_utf8_to_bytes; use crate::selection::collect_explicit_skill_mentions; use crate::sources::SkillProviders; +use crate::state::ExecutorSkillsStepState; use crate::state::SkillsThreadState; use crate::state::SkillsTurnState; use crate::tools::skill_tools; +use crate::world_state::executor_skills_world_state_section; struct SkillsExtension { providers: SkillProviders, @@ -56,18 +59,12 @@ where { fn on_thread_start<'a>(&'a self, input: ThreadStartInput<'a, C>) -> ExtensionFuture<'a, ()> { Box::pin(async move { - let selected_roots = input - .thread_store - .get::>() - .map(|selected_roots| selected_roots.as_ref().clone()) - .unwrap_or_default(); let orchestrator_skills_available = !input .environments .iter() .any(|environment| environment.environment_id == LOCAL_ENVIRONMENT_ID); input.thread_store.insert(SkillsThreadState::new( (self.config_from_host)(input.config), - selected_roots, orchestrator_skills_available, )); }) @@ -92,7 +89,6 @@ where let orchestrator_skills_available = true; thread_store.insert(SkillsThreadState::new( next_config, - Vec::new(), orchestrator_skills_available, )); } @@ -120,7 +116,7 @@ where .list_skills( SkillListQuery { turn_id: thread_store.level_id().to_string(), - executor_roots: thread_state.selected_roots().to_vec(), + executor_roots: Vec::new(), host_snapshot: None, include_host_skills: false, include_bundled_skills: config.bundled_skills_enabled, @@ -139,6 +135,39 @@ where .collect() }) } + + fn contribute_world_state<'a>( + &'a self, + input: WorldStateContributionInput<'a>, + ) -> ExtensionFuture<'a, Vec> { + Box::pin(async move { + let Some(thread_state) = input.thread_store.get::() else { + return Vec::new(); + }; + let config = thread_state.config(); + let catalog = thread_state + .executor_catalog_snapshot( + &self.providers, + SkillListQuery { + turn_id: input.turn_id.to_string(), + executor_roots: input.ready_selected_capability_roots.to_vec(), + host_snapshot: None, + include_host_skills: false, + include_bundled_skills: config.bundled_skills_enabled, + include_orchestrator_skills: false, + mcp_resources: input.session_store.get::(), + }, + ) + .await; + input + .turn_store + .insert(ExecutorSkillsStepState(catalog.clone())); + vec![executor_skills_world_state_section( + &catalog, + config.include_instructions, + )] + }) + } } impl ToolContributor for SkillsExtension @@ -187,14 +216,17 @@ where let host_snapshot = turn_store.get::(); let query = SkillListQuery { turn_id: input.turn_id.clone(), - executor_roots: thread_state.selected_roots().to_vec(), + executor_roots: Vec::new(), host_snapshot: host_snapshot.clone(), include_host_skills: true, include_bundled_skills: config.bundled_skills_enabled, include_orchestrator_skills: thread_state.orchestrator_skills_enabled(), mcp_resources: session_store.get::(), }; - let catalog = self.list_skills(query, &thread_state).await; + let mut catalog = self.list_skills(query, &thread_state).await; + if let Some(executor_skills) = turn_store.get::() { + catalog.extend(executor_skills.0.clone()); + } for warning in &catalog.warnings { self.emit_warning(&input.turn_id, warning.clone()); } diff --git a/codex-rs/ext/skills/src/lib.rs b/codex-rs/ext/skills/src/lib.rs index 8abdf6a93..4a004ff9a 100644 --- a/codex-rs/ext/skills/src/lib.rs +++ b/codex-rs/ext/skills/src/lib.rs @@ -8,6 +8,7 @@ mod selection; mod sources; mod state; mod tools; +mod world_state; pub use config::SkillsExtensionConfig; pub use extension::install; diff --git a/codex-rs/ext/skills/src/sources.rs b/codex-rs/ext/skills/src/sources.rs index d558c13b3..a75a047e0 100644 --- a/codex-rs/ext/skills/src/sources.rs +++ b/codex-rs/ext/skills/src/sources.rs @@ -135,6 +135,11 @@ impl SkillProviders { Ok(catalog) } + pub(crate) async fn list_executor_for_turn(&self, query: SkillListQuery) -> SkillCatalog { + self.list_matching(&query, |source| source.kind == SkillSourceKind::Executor) + .await + } + async fn list_matching( &self, query: &SkillListQuery, diff --git a/codex-rs/ext/skills/src/state.rs b/codex-rs/ext/skills/src/state.rs index 0b751ebdc..2f1a84c46 100644 --- a/codex-rs/ext/skills/src/state.rs +++ b/codex-rs/ext/skills/src/state.rs @@ -18,6 +18,7 @@ use crate::catalog::SkillProviderResult; use crate::catalog::SkillReadResult; use crate::catalog::SkillResourceId; use crate::catalog::SkillSourceKind; +use crate::provider::SkillListQuery; use crate::provider::SkillReadRequest; use crate::sources::SkillProviders; @@ -26,21 +27,17 @@ const MAX_CACHED_ORCHESTRATOR_CONTENT_BYTES: usize = 8 * 1024 * 1024; pub(crate) struct SkillsThreadState { config: Mutex, - selected_roots: Vec, orchestrator_skills_available: bool, + executor_cache: Mutex>, orchestrator_cache: Mutex>>, } impl SkillsThreadState { - pub(crate) fn new( - config: SkillsExtensionConfig, - selected_roots: Vec, - orchestrator_skills_available: bool, - ) -> Self { + pub(crate) fn new(config: SkillsExtensionConfig, orchestrator_skills_available: bool) -> Self { Self { config: Mutex::new(config), - selected_roots, orchestrator_skills_available, + executor_cache: Mutex::new(Vec::new()), orchestrator_cache: Mutex::new(None), } } @@ -59,14 +56,33 @@ impl SkillsThreadState { .unwrap_or_else(std::sync::PoisonError::into_inner) = config; } - pub(crate) fn selected_roots(&self) -> &[SelectedCapabilityRoot] { - &self.selected_roots - } - pub(crate) fn orchestrator_skills_enabled(&self) -> bool { self.orchestrator_skills_available && self.config().orchestrator_skills_enabled } + /// Returns catalogs for stable selected roots. + /// + /// The first catalog returned for a root remains cached until this thread state is dropped. + /// Environment availability only controls whether the root is projected into the current + /// step; it never invalidates the cache. There is intentionally no filesystem watcher or + /// content-based invalidation because selected environment roots are treated as stable. + pub(crate) async fn executor_catalog_snapshot( + &self, + providers: &SkillProviders, + mut query: SkillListQuery, + ) -> SkillCatalog { + let roots = std::mem::take(&mut query.executor_roots); + let mut catalog = SkillCatalog::default(); + for root in roots { + query.executor_roots = vec![root.clone()]; + catalog.extend( + self.executor_root_catalog(providers, root, query.clone()) + .await, + ); + } + catalog + } + pub(crate) async fn orchestrator_catalog_snapshot( &self, mcp_resources: Option<&McpResourceClient>, @@ -140,6 +156,42 @@ impl SkillsThreadState { *cache = Some(Arc::clone(&next_cache)); next_cache } + + async fn executor_root_catalog( + &self, + providers: &SkillProviders, + root: SelectedCapabilityRoot, + query: SkillListQuery, + ) -> SkillCatalog { + if let Some(cached) = self + .executor_cache + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .iter() + .find(|cached| cached.root == root) + { + return cached.catalog.clone(); + } + + let discovered = providers.list_executor_for_turn(query).await; + let mut cache = self + .executor_cache + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if let Some(cached) = cache.iter().find(|cached| cached.root == root) { + return cached.catalog.clone(); + } + cache.push(CachedExecutorCatalog { + root, + catalog: discovered.clone(), + }); + discovered + } +} + +struct CachedExecutorCatalog { + root: SelectedCapabilityRoot, + catalog: SkillCatalog, } struct OrchestratorGenerationCache { @@ -204,3 +256,6 @@ pub(crate) struct SkillsTurnState { pub(crate) warnings: Vec, pub(crate) main_prompts_injected: bool, } + +#[derive(Clone, Debug, Default)] +pub(crate) struct ExecutorSkillsStepState(pub(crate) SkillCatalog); diff --git a/codex-rs/ext/skills/src/world_state.rs b/codex-rs/ext/skills/src/world_state.rs new file mode 100644 index 000000000..ce36427b0 --- /dev/null +++ b/codex-rs/ext/skills/src/world_state.rs @@ -0,0 +1,62 @@ +use codex_extension_api::ContextualUserFragment; +use codex_extension_api::PreviousWorldStateSection; +use codex_extension_api::RenderedWorldStateFragment; +use codex_extension_api::WorldStateSectionContribution; +use codex_protocol::protocol::SKILLS_INSTRUCTIONS_CLOSE_TAG; +use codex_protocol::protocol::SKILLS_INSTRUCTIONS_OPEN_TAG; +use serde_json::json; + +use crate::catalog::SkillCatalog; +use crate::render::available_skills_fragment; + +pub(crate) const SKILLS_WORLD_STATE_ID: &str = "skills"; +const NO_EXECUTOR_SKILLS_BODY: &str = + "\n## Skills update\nNo selected-environment skills are currently available.\n"; +const HIDDEN_EXECUTOR_SKILLS_BODY: &str = "\n## Skills update\nSelected-environment skills are not listed automatically. Explicit skill mentions can still be resolved when available.\n"; + +pub(crate) fn executor_skills_world_state_section( + catalog: &SkillCatalog, + include_instructions: bool, +) -> WorldStateSectionContribution { + let body = if include_instructions { + available_skills_fragment(catalog).map(|fragment| fragment.body()) + } else { + None + }; + let snapshot = json!({ + "body": body, + "includeInstructions": include_instructions, + }); + + WorldStateSectionContribution::new(SKILLS_WORLD_STATE_ID, snapshot, move |previous| { + let previous_is_absent = matches!(&previous, PreviousWorldStateSection::Absent); + if let PreviousWorldStateSection::Known(previous) = &previous { + let previous_body = previous.get("body").and_then(serde_json::Value::as_str); + let previous_include_instructions = previous + .get("includeInstructions") + .and_then(serde_json::Value::as_bool); + if previous_body == body.as_deref() + && previous_include_instructions == Some(include_instructions) + { + return None; + } + } + + let body = match body.as_deref() { + Some(body) => body, + None if previous_is_absent => return None, + None if !include_instructions => HIDDEN_EXECUTOR_SKILLS_BODY, + None => NO_EXECUTOR_SKILLS_BODY, + }; + Some(RenderedWorldStateFragment::new( + "developer", + (SKILLS_INSTRUCTIONS_OPEN_TAG, SKILLS_INSTRUCTIONS_CLOSE_TAG), + body, + )) + }) + .with_legacy_matcher(|role, text| { + role == "developer" + && text.trim_start().starts_with(SKILLS_INSTRUCTIONS_OPEN_TAG) + && text.trim_end().ends_with(SKILLS_INSTRUCTIONS_CLOSE_TAG) + }) +} diff --git a/codex-rs/ext/skills/tests/skills_extension.rs b/codex-rs/ext/skills/tests/skills_extension.rs index 761dc34d7..7e9ea01c5 100644 --- a/codex-rs/ext/skills/tests/skills_extension.rs +++ b/codex-rs/ext/skills/tests/skills_extension.rs @@ -15,10 +15,12 @@ use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionEventSink; use codex_extension_api::ExtensionRegistryBuilder; use codex_extension_api::NoopTurnItemEmitter; +use codex_extension_api::PreviousWorldStateSection; use codex_extension_api::ThreadStartInput; use codex_extension_api::ToolCall; use codex_extension_api::ToolPayload; use codex_extension_api::TurnInputContext; +use codex_extension_api::WorldStateContributionInput; use codex_protocol::capabilities::CapabilityRootLocation; use codex_protocol::capabilities::SelectedCapabilityRoot; use codex_protocol::protocol::Event; @@ -28,6 +30,7 @@ use codex_protocol::protocol::SKILLS_INSTRUCTIONS_OPEN_TAG; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SkillScope; use codex_protocol::protocol::TruncationPolicy; +use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::user_input::UserInput; use codex_skills_extension::SkillProviders; use codex_skills_extension::SkillsExtensionConfig; @@ -144,9 +147,9 @@ async fn installed_extension_uses_host_service_snapshot() -> TestResult { } #[tokio::test] -async fn selected_executor_catalog_is_context_and_selected_entrypoint_is_turn_input() -> TestResult -{ +async fn selected_executor_catalog_follows_step_availability_and_reuses_its_cache() -> TestResult { let read_requests = Arc::new(Mutex::new(Vec::new())); + let list_calls = Arc::new(AtomicUsize::new(0)); let executor_provider = Arc::new(StaticSkillProvider { catalog: SkillCatalog { entries: vec![test_entry( @@ -158,7 +161,7 @@ async fn selected_executor_catalog_is_context_and_selected_entrypoint_is_turn_in warnings: Vec::new(), }, read_requests: Arc::clone(&read_requests), - list_calls: None, + list_calls: Some(Arc::clone(&list_calls)), fail_first_list: false, }); let providers = SkillProviders::new().with_executor_provider(executor_provider); @@ -168,13 +171,13 @@ async fn selected_executor_catalog_is_context_and_selected_entrypoint_is_turn_in let session_store = ExtensionData::new("session"); let thread_store = ExtensionData::new("thread"); - thread_store.insert(vec![SelectedCapabilityRoot { + let selected_roots = vec![SelectedCapabilityRoot { id: "lint-fix".to_string(), location: CapabilityRootLocation::Environment { environment_id: "env-1".to_string(), path: PathUri::parse("file:///skills/lint-fix").expect("skill root URI"), }, - }]); + }]; let session_source = SessionSource::Cli; let config = default_config(); registry.thread_lifecycle_contributors()[0] @@ -191,20 +194,36 @@ async fn selected_executor_catalog_is_context_and_selected_entrypoint_is_turn_in let prompt_fragments = registry.context_contributors()[0] .contribute_thread_context(&session_store, &thread_store) .await; - assert_eq!(1, prompt_fragments.len()); + assert!(prompt_fragments.is_empty()); + + let turn_store = ExtensionData::new("turn-1"); + let turn_environment = TurnEnvironmentSelection { + environment_id: "turn-env".to_string(), + cwd: PathUri::parse("file:///workspace").expect("cwd URI"), + }; + let available_sections = registry.context_contributors()[0] + .contribute_world_state(WorldStateContributionInput { + thread_id: codex_protocol::ThreadId::new(), + turn_id: "turn-1", + environments: std::slice::from_ref(&turn_environment), + ready_selected_capability_roots: &selected_roots, + session_store: &session_store, + thread_store: &thread_store, + turn_store: &turn_store, + }) + .await; + assert_eq!(1, available_sections.len()); + let available_snapshot = available_sections[0].snapshot().clone(); + let available_fragment = available_sections[0] + .render_diff(PreviousWorldStateSection::Absent) + .ok_or("available skills should render")?; + assert!(available_fragment.body().contains("lint-fix")); assert!( - prompt_fragments[0] - .text() - .starts_with(SKILLS_INSTRUCTIONS_OPEN_TAG) - ); - assert!(prompt_fragments[0].text().contains("lint-fix")); - assert!( - prompt_fragments[0] - .text() + available_fragment + .body() .contains("(environment resource: skill://executor/lint-fix/SKILL.md)") ); - let turn_store = ExtensionData::new("turn-1"); let fragments = registry.turn_input_contributors()[0] .contribute( TurnInputContext { @@ -233,30 +252,86 @@ async fn selected_executor_catalog_is_context_and_selected_entrypoint_is_turn_in )], read_request_keys(&read_requests) ); - let rebuilt_prompt_fragments = registry.context_contributors()[0] - .contribute_thread_context(&session_store, &thread_store) + let unavailable_turn_store = ExtensionData::new("turn-2"); + let unavailable_sections = registry.context_contributors()[0] + .contribute_world_state(WorldStateContributionInput { + thread_id: codex_protocol::ThreadId::new(), + turn_id: "turn-2", + environments: &[], + ready_selected_capability_roots: &[], + session_store: &session_store, + thread_store: &thread_store, + turn_store: &unavailable_turn_store, + }) .await; - assert_eq!(1, rebuilt_prompt_fragments.len()); - assert!(rebuilt_prompt_fragments[0].text().contains("lint-fix")); + let unavailable_snapshot = unavailable_sections[0].snapshot().clone(); + let unavailable_fragment = unavailable_sections[0] + .render_diff(PreviousWorldStateSection::Known(&available_snapshot)) + .ok_or("removed skills should render")?; + assert!( + unavailable_fragment + .body() + .contains("No selected-environment skills") + ); - let next_turn_store = ExtensionData::new("turn-2"); - let next_fragments = registry.turn_input_contributors()[0] - .contribute( - TurnInputContext { - turn_id: "turn-2".to_string(), - user_input: vec![UserInput::Text { - text: "no skill this time".to_string(), - text_elements: Vec::new(), - }], - environments: Vec::new(), - }, - &session_store, - &thread_store, - &next_turn_store, - ) + let restored_turn_store = ExtensionData::new("turn-3"); + let restored_sections = registry.context_contributors()[0] + .contribute_world_state(WorldStateContributionInput { + thread_id: codex_protocol::ThreadId::new(), + turn_id: "turn-3", + environments: &[turn_environment], + ready_selected_capability_roots: &selected_roots, + session_store: &session_store, + thread_store: &thread_store, + turn_store: &restored_turn_store, + }) .await; + let restored_snapshot = restored_sections[0].snapshot().clone(); + let restored_fragment = restored_sections[0] + .render_diff(PreviousWorldStateSection::Known(&unavailable_snapshot)) + .ok_or("restored skills should render")?; + assert!(restored_fragment.body().contains("lint-fix")); + assert_eq!(1, list_calls.load(Ordering::Relaxed)); - assert!(next_fragments.is_empty()); + let mut listing_disabled_config = config.clone(); + listing_disabled_config.include_instructions = false; + registry.config_contributors()[0].on_config_changed( + &session_store, + &thread_store, + &config, + &listing_disabled_config, + ); + let listing_disabled_turn_store = ExtensionData::new("turn-4"); + let listing_disabled_sections = registry.context_contributors()[0] + .contribute_world_state(WorldStateContributionInput { + thread_id: codex_protocol::ThreadId::new(), + turn_id: "turn-4", + environments: &[], + ready_selected_capability_roots: &selected_roots, + session_store: &session_store, + thread_store: &thread_store, + turn_store: &listing_disabled_turn_store, + }) + .await; + let listing_disabled_fragment = listing_disabled_sections[0] + .render_diff(PreviousWorldStateSection::Known(&restored_snapshot)) + .ok_or("disabled skill listing should render")?; + assert_eq!( + "\n## Skills update\nSelected-environment skills are not listed automatically. Explicit skill mentions can still be resolved when available.\n", + listing_disabled_fragment.body() + ); + let mut normalized_listing_disabled_snapshot = listing_disabled_sections[0].snapshot().clone(); + normalized_listing_disabled_snapshot + .as_object_mut() + .ok_or("skills snapshot should be an object")? + .remove("body"); + assert!( + listing_disabled_sections[0] + .render_diff(PreviousWorldStateSection::Known( + &normalized_listing_disabled_snapshot + )) + .is_none() + ); Ok(()) } @@ -488,18 +563,16 @@ async fn root_qualified_locator_selects_only_the_matching_executor_skill() -> Te let registry = builder.build(); let session_store = ExtensionData::new("session"); let thread_store = ExtensionData::new("thread"); - thread_store.insert( - [("root-a", "/skills/root-a"), ("root-b", "/skills/root-b")] - .into_iter() - .map(|(id, path)| SelectedCapabilityRoot { - id: id.to_string(), - location: CapabilityRootLocation::Environment { - environment_id: "env-1".to_string(), - path: PathUri::parse(&format!("file://{path}")).expect("skill root URI"), - }, - }) - .collect::>(), - ); + let selected_roots = [("root-a", "/skills/root-a"), ("root-b", "/skills/root-b")] + .into_iter() + .map(|(id, path)| SelectedCapabilityRoot { + id: id.to_string(), + location: CapabilityRootLocation::Environment { + environment_id: "env-1".to_string(), + path: PathUri::parse(&format!("file://{path}")).expect("skill root URI"), + }, + }) + .collect::>(); let session_source = SessionSource::Cli; let config = default_config(); registry.thread_lifecycle_contributors()[0] @@ -513,6 +586,21 @@ async fn root_qualified_locator_selects_only_the_matching_executor_skill() -> Te }) .await; + let turn_store = ExtensionData::new("turn-1"); + registry.context_contributors()[0] + .contribute_world_state(WorldStateContributionInput { + thread_id: codex_protocol::ThreadId::new(), + turn_id: "turn-1", + environments: &[TurnEnvironmentSelection { + environment_id: "env-1".to_string(), + cwd: PathUri::parse("file:///workspace").expect("cwd URI"), + }], + ready_selected_capability_roots: &selected_roots, + session_store: &session_store, + thread_store: &thread_store, + turn_store: &turn_store, + }) + .await; let fragments = registry.turn_input_contributors()[0] .contribute( TurnInputContext { @@ -525,7 +613,7 @@ async fn root_qualified_locator_selects_only_the_matching_executor_skill() -> Te }, &session_store, &thread_store, - &ExtensionData::new("turn-1"), + &turn_store, ) .await;