diff --git a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs index 77b9bfcbf..bc0a64ef7 100644 --- a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs +++ b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs @@ -215,6 +215,7 @@ impl ExternalAgentSessionImporter { .unwrap_or_else(|| model_info.get_model_instructions(config.personality)), }, dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), multi_agent_version: Some(MultiAgentVersion::V1), initial_window_id: uuid::Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { diff --git a/codex-rs/app-server/tests/common/rollout.rs b/codex-rs/app-server/tests/common/rollout.rs index ec01dacc4..8d2567c01 100644 --- a/codex-rs/app-server/tests/common/rollout.rs +++ b/codex-rs/app-server/tests/common/rollout.rs @@ -200,6 +200,7 @@ fn create_fake_rollout_with_source_and_parent_thread_id( model_provider: model_provider.map(str::to_string), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, @@ -288,6 +289,7 @@ pub fn create_fake_rollout_with_text_elements( model_provider: model_provider.map(str::to_string), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/app-server/tests/suite/conversation_summary.rs b/codex-rs/app-server/tests/suite/conversation_summary.rs index 7f964d314..63ad47326 100644 --- a/codex-rs/app-server/tests/suite/conversation_summary.rs +++ b/codex-rs/app-server/tests/suite/conversation_summary.rs @@ -131,6 +131,7 @@ async fn get_conversation_summary_by_thread_id_reads_pathless_store_thread() -> originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), multi_agent_version: None, initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { diff --git a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs index 8fdb38e5e..7e0db1b5b 100644 --- a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs +++ b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs @@ -158,6 +158,7 @@ async fn thread_delete_with_non_local_thread_store_does_not_create_local_persist originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), multi_agent_version: None, initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 6b427b0d9..9f6776175 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -1369,6 +1369,7 @@ async fn seed_pathless_store_thread( originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), multi_agent_version: None, initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index ce95f4810..533087a11 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -2070,6 +2070,7 @@ stream_max_retries = 0 model_provider: Some("mock_provider".to_string()), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs index f647dc2cf..231ede47b 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs @@ -218,6 +218,7 @@ async fn thread_unarchive_preserves_pathless_store_metadata() -> Result<()> { originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), multi_agent_version: None, initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { diff --git a/codex-rs/core/src/agent/control/spawn.rs b/codex-rs/core/src/agent/control/spawn.rs index 6919d8c62..c4c40577c 100644 --- a/codex-rs/core/src/agent/control/spawn.rs +++ b/codex-rs/core/src/agent/control/spawn.rs @@ -1,5 +1,6 @@ use super::residency::is_v2_resident_session_source; use super::*; +use codex_extension_api::ExtensionDataInit; const AGENT_NAMES: &str = include_str!("../agent_names.txt"); @@ -433,6 +434,16 @@ impl AgentControl { )) })?; + let selected_capability_roots = parent_history + .items + .iter() + .find_map(|item| { + let RolloutItem::SessionMeta(meta_line) = item else { + return None; + }; + Some(meta_line.meta.selected_capability_roots.clone()) + }) + .unwrap_or_default(); let mut forked_rollout_items = parent_history.items; if let SpawnAgentForkMode::LastNTurns(last_n_turns) = fork_mode { forked_rollout_items = @@ -504,6 +515,8 @@ impl AgentControl { { forked_rollout_items.push(RolloutItem::ResponseItem(subagent_usage_hint_message)); } + let mut thread_extension_init = ExtensionDataInit::new(); + thread_extension_init.insert(selected_capability_roots); state .fork_thread_with_source( @@ -517,6 +530,7 @@ impl AgentControl { inherited_environments, inherited_exec_policy, options.environments.clone(), + thread_extension_init, ) .await } diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index e00a8624f..de8754ad9 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -17,6 +17,8 @@ use codex_features::Feature; use codex_login::AuthManager; use codex_login::CodexAuth; use codex_protocol::AgentPath; +use codex_protocol::capabilities::CapabilityRootLocation; +use codex_protocol::capabilities::SelectedCapabilityRoot; use codex_protocol::config_types::ModeKind; use codex_protocol::models::ContentItem; use codex_protocol::models::MessagePhase; @@ -38,6 +40,7 @@ use codex_thread_store::InMemoryThreadStore; use codex_thread_store::LocalThreadStore; use codex_thread_store::LocalThreadStoreConfig; use codex_thread_store::ThreadStore; +use codex_utils_path_uri::PathUri; use pretty_assertions::assert_eq; use tempfile::TempDir; use tokio::time::Duration; @@ -1446,7 +1449,33 @@ async fn spawn_agent_fork_last_n_turns_keeps_only_recent_turns() { #[tokio::test] async fn spawn_agent_fork_last_n_turns_drops_parent_startup_prefix_when_under_limit() { let harness = AgentControlHarness::new().await; - let (parent_thread_id, parent_thread) = harness.start_thread().await; + let selected_capability_roots = vec![SelectedCapabilityRoot { + id: "demo@1".to_string(), + location: CapabilityRootLocation::Environment { + environment_id: "build".to_string(), + path: PathUri::parse("file:///plugins/demo").expect("plugin root URI"), + }, + }]; + let mut thread_extension_init = ExtensionDataInit::new(); + thread_extension_init.insert(selected_capability_roots.clone()); + let parent = harness + .manager + .start_thread_with_options(StartThreadOptions { + config: harness.config.clone(), + initial_history: InitialHistory::New, + session_source: None, + thread_source: None, + dynamic_tools: Vec::new(), + metrics_service_name: None, + parent_trace: None, + environments: Vec::new(), + thread_extension_init, + supports_openai_form_elicitation: false, + }) + .await + .expect("start parent thread"); + let parent_thread_id = parent.thread_id; + let parent_thread = parent.thread; let startup_turn_context = parent_thread.codex.session.new_default_turn().await; parent_thread .codex @@ -1525,6 +1554,14 @@ async fn spawn_agent_fork_last_n_turns_drops_parent_startup_prefix_when_under_li !history_contains_text(history.raw_items(), "parent startup developer context"), "bounded fork should drop parent startup context even when fewer turns exist than requested" ); + assert_eq!( + &child_thread + .codex + .session + .services + .selected_capability_roots, + &selected_capability_roots + ); assert!( child_thread .codex diff --git a/codex-rs/core/src/environment_selection.rs b/codex-rs/core/src/environment_selection.rs index fa86dded5..ef6a3ec5e 100644 --- a/codex-rs/core/src/environment_selection.rs +++ b/codex-rs/core/src/environment_selection.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::collections::HashSet; use std::fmt; use std::sync::Arc; @@ -227,6 +228,24 @@ pub(crate) struct TurnEnvironmentSnapshot { } impl TurnEnvironmentSnapshot { + /// Maps each captured environment to its exact ready handle, or `None` when it was starting. + pub(crate) fn captured_environments(&self) -> HashMap>> { + self.turn_environments + .iter() + .map(|environment| { + ( + environment.environment_id.clone(), + Some(Arc::clone(&environment.environment)), + ) + }) + .chain( + self.starting + .iter() + .map(|environment| (environment.selection.environment_id.clone(), None)), + ) + .collect() + } + pub(crate) fn primary(&self) -> Option<&TurnEnvironment> { self.turn_environments.first() } diff --git a/codex-rs/core/src/personality_migration_tests.rs b/codex-rs/core/src/personality_migration_tests.rs index dd0da327c..396d06564 100644 --- a/codex-rs/core/src/personality_migration_tests.rs +++ b/codex-rs/core/src/personality_migration_tests.rs @@ -59,6 +59,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R model_provider: None, base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index be12e61e0..94bef646a 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -2828,9 +2828,19 @@ impl Session { .await; } let loaded_agents_md = self.services.agents_md_manager.get_loaded().await; + let selected_capability_roots = self + .services + .turn_environments + .environment_manager() + .resolve_selected_capability_roots( + &self.services.selected_capability_roots, + &environments.captured_environments(), + ) + .await; Arc::new(StepContext::new( turn_context, environments, + selected_capability_roots, loaded_agents_md, )) } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index e43a26bbe..96dffed88 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -10,6 +10,7 @@ use crate::state::ActiveTurn; use codex_extension_api::ExtensionDataInit; use codex_login::auth::AgentIdentityAuthPolicy; use codex_protocol::SessionId; +use codex_protocol::capabilities::SelectedCapabilityRoot; use codex_protocol::config_types::SERVICE_TIER_DEFAULT_REQUEST_VALUE; use codex_protocol::config_types::ServiceTier; use codex_protocol::permissions::FileSystemPath; @@ -555,6 +556,10 @@ impl Session { config.current_time_reminder.as_ref(), external_time_provider, )?; + let selected_capability_roots = thread_extension_init + .get::>() + .map(|roots| roots.as_ref().clone()) + .unwrap_or_else(|| initial_history.get_selected_capability_roots()); let mcp_thread_init = thread_extension_init.clone(); let thread_extension_data = codex_extension_api::ExtensionData::new_with_init( thread_id.to_string(), @@ -584,6 +589,7 @@ impl Session { text: session_configuration.base_instructions.clone(), }, dynamic_tools: session_configuration.dynamic_tools.clone(), + selected_capability_roots: selected_capability_roots.clone(), multi_agent_version: initial_multi_agent_version, initial_window_id: initial_auto_compact_window_ids .window_id @@ -1041,6 +1047,7 @@ impl Session { // TODO(jif): extract session to share between sub-agents session_extension_data, thread_extension_data, + selected_capability_roots, mcp_thread_init, supports_openai_form_elicitation: std::sync::atomic::AtomicBool::new( supports_openai_form_elicitation, diff --git a/codex-rs/core/src/session/step_context.rs b/codex-rs/core/src/session/step_context.rs index bf672db6e..e46301545 100644 --- a/codex-rs/core/src/session/step_context.rs +++ b/codex-rs/core/src/session/step_context.rs @@ -3,12 +3,15 @@ use std::sync::Arc; use crate::agents_md::LoadedAgentsMd; use crate::environment_selection::TurnEnvironmentSnapshot; use crate::session::turn_context::TurnContext; +use codex_exec_server::ResolvedSelectedCapabilityRoot; /// Request-scoped state that may change between model sampling requests. #[derive(Debug)] pub(crate) struct StepContext { pub(crate) turn: Arc, pub(crate) environments: TurnEnvironmentSnapshot, + /// Capability roots bound to ready environments in this exact step. + pub(crate) selected_capability_roots: Vec, /// The canonical AGENTS.md value observed with this environment snapshot. pub(crate) loaded_agents_md: Option>, } @@ -17,11 +20,13 @@ impl StepContext { pub(crate) fn new( turn: Arc, environments: TurnEnvironmentSnapshot, + selected_capability_roots: Vec, loaded_agents_md: Option>, ) -> Self { Self { turn, environments, + selected_capability_roots, loaded_agents_md, } } diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 4553d54a0..525fbc3b9 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -190,6 +190,7 @@ impl StepContext { Arc::new(Self::new( turn, environments, + Vec::new(), /*loaded_agents_md*/ None, )) } @@ -4043,6 +4044,7 @@ async fn attach_thread_persistence(session: &mut Session) -> PathBuf { originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), multi_agent_version: None, initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { @@ -5404,6 +5406,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { agent_control.session_id().to_string(), ), thread_extension_data: codex_extension_api::ExtensionData::new(thread_id.to_string()), + selected_capability_roots: Vec::new(), mcp_thread_init: codex_extension_api::ExtensionDataInit::default(), supports_openai_form_elicitation: std::sync::atomic::AtomicBool::new(false), agent_control, @@ -6919,6 +6922,7 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() { originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), multi_agent_version: None, initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { @@ -7479,6 +7483,7 @@ where agent_control.session_id().to_string(), ), thread_extension_data: codex_extension_api::ExtensionData::new(thread_id.to_string()), + selected_capability_roots: Vec::new(), mcp_thread_init: codex_extension_api::ExtensionDataInit::default(), supports_openai_form_elicitation: std::sync::atomic::AtomicBool::new(false), agent_control, diff --git a/codex-rs/core/src/session/world_state.rs b/codex-rs/core/src/session/world_state.rs index e19e8c2ff..cedeaab25 100644 --- a/codex-rs/core/src/session/world_state.rs +++ b/codex-rs/core/src/session/world_state.rs @@ -10,6 +10,10 @@ impl Session { step_context: &StepContext, ) -> WorldState { let turn_context = step_context.turn.as_ref(); + tracing::trace!( + selected_capability_root_count = step_context.selected_capability_roots.len(), + "building step world state" + ); let environment_subagents = if turn_context.config.include_environment_context { self.services .agent_control diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index cfed7d90d..3721cb184 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -33,6 +33,7 @@ use codex_login::AuthManager; use codex_mcp::McpConnectionManager; use codex_models_manager::manager::SharedModelsManager; use codex_otel::SessionTelemetry; +use codex_protocol::capabilities::SelectedCapabilityRoot; use codex_rollout::state_db::StateDbHandle; use codex_rollout_trace::ThreadTraceContext; use codex_thread_store::LiveThread; @@ -72,6 +73,9 @@ pub(crate) struct SessionServices { pub(crate) session_extension_data: ExtensionData, pub(crate) thread_extension_data: ExtensionData, pub(crate) supports_openai_form_elicitation: AtomicBool, + /// Raw capability selections for this thread. Each model step resolves them against its + /// current executor environments before using them. + pub(crate) selected_capability_roots: Vec, pub(crate) mcp_thread_init: ExtensionDataInit, pub(crate) agent_control: AgentControl, pub(crate) network_proxy: ArcSwapOption, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 98e13c5d6..b2add0602 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -1402,6 +1402,7 @@ impl ThreadManagerState { inherited_environments: Option, inherited_exec_policy: Option>, environments: Option>, + thread_extension_init: ExtensionDataInit, ) -> CodexResult { let environments = environments.unwrap_or_else(|| { default_thread_environment_selections(self.environment_manager.as_ref(), &config.cwd) @@ -1421,7 +1422,7 @@ impl ThreadManagerState { inherited_exec_policy, /*parent_trace*/ None, environments, - /*thread_extension_init*/ ExtensionDataInit::default(), + thread_extension_init, /*supports_openai_form_elicitation*/ false, /*user_shell_override*/ None, )) diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index f1acc2d97..0d407e19c 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -20,6 +20,8 @@ use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InternalSessionSource; use codex_protocol::protocol::ResumedHistory; +use codex_protocol::protocol::SessionMeta; +use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnStartedEvent; @@ -610,6 +612,71 @@ async fn start_thread_seeds_extension_data_for_mcp_and_lifecycle_contributors() ); } +#[tokio::test] +async fn selected_capability_roots_round_trip_through_fork() { + let temp_dir = tempdir().expect("tempdir"); + let mut config = test_config().await; + config.codex_home = temp_dir.path().join("codex-home").abs(); + config.cwd = config.codex_home.abs(); + std::fs::create_dir_all(&config.codex_home).expect("create codex home"); + + let manager = ThreadManager::with_models_provider_and_home_for_tests( + CodexAuth::from_api_key("dummy"), + config.model_provider.clone(), + config.codex_home.to_path_buf(), + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + ); + let selected_roots = vec![SelectedCapabilityRoot { + id: "demo@1".to_string(), + location: CapabilityRootLocation::Environment { + environment_id: "build".to_string(), + path: PathUri::parse("file:///plugins/demo").expect("plugin root URI"), + }, + }]; + let inherited = manager + .start_thread_with_options(StartThreadOptions { + config, + initial_history: InitialHistory::Forked(vec![RolloutItem::SessionMeta( + SessionMetaLine { + meta: SessionMeta { + selected_capability_roots: selected_roots.clone(), + ..SessionMeta::default() + }, + git: None, + }, + )]), + session_source: None, + thread_source: None, + dynamic_tools: Vec::new(), + metrics_service_name: None, + parent_trace: None, + environments: Vec::new(), + thread_extension_init: Default::default(), + supports_openai_form_elicitation: false, + }) + .await + .expect("start inherited fork"); + inherited.thread.ensure_rollout_materialized().await; + inherited + .thread + .flush_rollout() + .await + .expect("flush inherited fork"); + let inherited_history = RolloutRecorder::get_rollout_history( + &inherited + .thread + .rollout_path() + .expect("inherited fork rollout path"), + ) + .await + .expect("read inherited fork rollout"); + + assert_eq!( + inherited_history.get_selected_capability_roots(), + selected_roots + ); +} + #[tokio::test] async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() { let temp_dir = tempdir().expect("tempdir"); diff --git a/codex-rs/core/src/tools/spec_plan_tests.rs b/codex-rs/core/src/tools/spec_plan_tests.rs index c422cc749..6fc5adcb1 100644 --- a/codex-rs/core/src/tools/spec_plan_tests.rs +++ b/codex-rs/core/src/tools/spec_plan_tests.rs @@ -689,6 +689,7 @@ async fn environment_tools_follow_the_step_context() { let step_context = Arc::new(StepContext::new( Arc::new(turn), environments, + Vec::new(), /*loaded_agents_md*/ None, )); diff --git a/codex-rs/core/tests/suite/personality_migration.rs b/codex-rs/core/tests/suite/personality_migration.rs index ca1d14872..3dd0f3c01 100644 --- a/codex-rs/core/tests/suite/personality_migration.rs +++ b/codex-rs/core/tests/suite/personality_migration.rs @@ -75,6 +75,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R model_provider: None, base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, @@ -127,6 +128,7 @@ async fn write_rollout_with_meta_only(dir: &Path, thread_id: ThreadId) -> io::Re model_provider: None, base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 5eea5a09e..1c2f08262 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -372,6 +372,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { model_provider: None, base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 664ef730d..66f738110 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -54,7 +54,7 @@ pub const CODEX_EXEC_SERVER_NOISE_CHATGPT_ACCOUNT_ID_ENV_VAR: &str = #[derive(Debug)] pub struct EnvironmentManager { default_environment: Option, - environments: RwLock>>, + pub(super) environments: RwLock>>, local_environment: Option>, local_runtime_paths: Option, } @@ -574,6 +574,25 @@ impl Environment { } } + /// Starts the initial connection after an environment is actually selected for use. + pub(crate) fn start_connecting_for_use(environment: &Arc) { + if environment.remote_client.is_none() { + return; + } + let mut startup_task = environment + .startup_task + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if startup_task.is_none() { + let environment = Arc::clone(environment); + *startup_task = Some(AbortOnDropHandle::new(tokio::spawn(async move { + if let Err(error) = environment.wait_until_ready().await { + tracing::debug!(%error, "exec-server environment startup failed"); + } + }))); + } + } + /// Returns whether initial startup has either succeeded or permanently failed. pub fn startup_finished(&self) -> bool { self.remote_client diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 5b2bfa7e5..92c7f85ed 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -22,6 +22,7 @@ mod relay_proto; mod remote; mod remote_file_system; mod remote_process; +mod resolved_capability; mod rpc; mod runtime_paths; mod sandboxed_file_system; @@ -143,6 +144,7 @@ pub use protocol::WriteResponse; pub use protocol::WriteStatus; pub use remote::RemoteEnvironmentConfig; pub use remote::run_remote_environment; +pub use resolved_capability::ResolvedSelectedCapabilityRoot; pub use runtime_paths::ExecServerRuntimePaths; pub use server::DEFAULT_LISTEN_URL; pub use server::ExecServerListenUrlParseError; diff --git a/codex-rs/exec-server/src/resolved_capability.rs b/codex-rs/exec-server/src/resolved_capability.rs new file mode 100644 index 000000000..2902cf9ee --- /dev/null +++ b/codex-rs/exec-server/src/resolved_capability.rs @@ -0,0 +1,114 @@ +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + +use codex_protocol::capabilities::CapabilityRootLocation; +use codex_protocol::capabilities::SelectedCapabilityRoot; + +use crate::Environment; +use crate::EnvironmentManager; +use crate::ExecutorFileSystem; + +/// A selected capability root paired with its currently ready environment handle. +/// +/// Environment IDs have stable identity and contents. This process-local value must not be +/// persisted: it only keeps the current connection handle alive while one model step uses the +/// stable environment. +#[derive(Clone)] +pub struct ResolvedSelectedCapabilityRoot { + selected_root: SelectedCapabilityRoot, + environment: Arc, +} + +impl ResolvedSelectedCapabilityRoot { + pub fn selected_root(&self) -> &SelectedCapabilityRoot { + &self.selected_root + } + + pub fn environment(&self) -> &Arc { + &self.environment + } + + pub fn file_system(&self) -> Arc { + self.environment.get_filesystem() + } +} + +impl EnvironmentManager { + /// Resolves selected roots whose stable environments are ready for the current model step. + /// + /// Environment identity comes from the selected root's stable environment ID. A ready + /// environment captured for the step carries its exact process-local handle so readiness and + /// execution cannot come from different registry snapshots. Missing, starting, or failed + /// environments are omitted. A lazy environment is started for a later step. + pub async fn resolve_selected_capability_roots( + &self, + selected_roots: &[SelectedCapabilityRoot], + captured_environments: &HashMap>>, + ) -> Vec { + let candidates = { + let environments = self + .environments + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner); + selected_roots + .iter() + .filter_map(|selected_root| { + let CapabilityRootLocation::Environment { environment_id, .. } = + &selected_root.location; + let (environment, already_ready) = + match captured_environments.get(environment_id) { + Some(Some(environment)) => (Arc::clone(environment), true), + Some(None) => return None, + None => (Arc::clone(environments.get(environment_id)?), false), + }; + Some(( + ResolvedSelectedCapabilityRoot { + selected_root: selected_root.clone(), + environment, + }, + already_ready, + )) + }) + .collect::>() + }; + + let mut readiness = HashMap::new(); + for (candidate, already_ready) in &candidates { + let CapabilityRootLocation::Environment { environment_id, .. } = + &candidate.selected_root().location; + if readiness.contains_key(environment_id) { + continue; + } + let environment = candidate.environment(); + let ready = if *already_ready { + true + } else if environment.startup_finished() { + environment.wait_until_ready().await.is_ok() + } else { + Environment::start_connecting_for_use(environment); + false + }; + readiness.insert(environment_id.clone(), ready); + } + + candidates + .into_iter() + .map(|(candidate, _)| candidate) + .filter(|candidate| { + let CapabilityRootLocation::Environment { environment_id, .. } = + &candidate.selected_root().location; + readiness.get(environment_id).copied().unwrap_or(false) + }) + .collect() + } +} + +impl fmt::Debug for ResolvedSelectedCapabilityRoot { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter + .debug_struct("ResolvedSelectedCapabilityRoot") + .field("selected_root", &self.selected_root) + .finish_non_exhaustive() + } +} diff --git a/codex-rs/exec-server/tests/selected_capability_roots.rs b/codex-rs/exec-server/tests/selected_capability_roots.rs new file mode 100644 index 000000000..639d652ca --- /dev/null +++ b/codex-rs/exec-server/tests/selected_capability_roots.rs @@ -0,0 +1,69 @@ +#![cfg(unix)] + +mod common; + +use std::collections::HashMap; +use std::sync::Arc; + +use codex_exec_server::EnvironmentManager; +use codex_protocol::capabilities::CapabilityRootLocation; +use codex_protocol::capabilities::SelectedCapabilityRoot; +use codex_utils_path_uri::PathUri; +use common::exec_server::exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn selected_capability_roots_use_captured_handle_after_replacement() -> anyhow::Result<()> { + let mut executor = exec_server().await?; + let manager = EnvironmentManager::without_environments(); + let selected_root = SelectedCapabilityRoot { + id: "demo@1".to_string(), + location: CapabilityRootLocation::Environment { + environment_id: "tools".to_string(), + path: PathUri::parse("file:///plugins/demo")?, + }, + }; + + manager.upsert_environment( + "tools".to_string(), + executor.websocket_url().to_string(), + /*connect_timeout*/ None, + )?; + let environment_a = manager + .get_environment("tools") + .expect("executor A should be registered"); + environment_a.wait_until_ready().await?; + + let unavailable = manager + .resolve_selected_capability_roots( + std::slice::from_ref(&selected_root), + &HashMap::from([("tools".to_string(), None)]), + ) + .await; + assert!(unavailable.is_empty()); + + let captured_environments = + HashMap::from([("tools".to_string(), Some(Arc::clone(&environment_a)))]); + // Replace only the process-local handle; the stable environment ID and executor stay the same. + manager.upsert_environment( + "tools".to_string(), + executor.websocket_url().to_string(), + /*connect_timeout*/ None, + )?; + + let available = manager + .resolve_selected_capability_roots( + std::slice::from_ref(&selected_root), + &captured_environments, + ) + .await; + let [resolved] = available.as_slice() else { + anyhow::bail!("selected root should resolve through its stable environment"); + }; + + assert_eq!(resolved.selected_root(), &selected_root); + assert!(Arc::ptr_eq(resolved.environment(), &environment_a)); + + executor.shutdown().await?; + Ok(()) +} diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 1affdf0d2..42b01b16f 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -19,6 +19,7 @@ use crate::AgentPath; use crate::SessionId; use crate::ThreadId; use crate::approvals::ElicitationRequestEvent; +use crate::capabilities::SelectedCapabilityRoot; use crate::config_types::ApprovalsReviewer; use crate::config_types::CollaborationMode; use crate::config_types::ModeKind; @@ -2570,6 +2571,12 @@ impl InitialHistory { } } + pub fn get_selected_capability_roots(&self) -> Vec { + self.get_session_meta() + .map(|meta| meta.selected_capability_roots.clone()) + .unwrap_or_default() + } + pub fn get_multi_agent_version(&self) -> Option { match self { InitialHistory::New | InitialHistory::Cleared => None, @@ -3003,6 +3010,9 @@ pub struct SessionMeta { skip_serializing_if = "Option::is_none" )] pub dynamic_tools: Option>, + /// Capability roots selected for this thread by the hosting platform. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub selected_capability_roots: Vec, #[serde(skip_serializing_if = "Option::is_none")] pub memory_mode: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -3032,6 +3042,7 @@ impl Default for SessionMeta { model_provider: None, base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/rollout/src/compression_tests.rs b/codex-rs/rollout/src/compression_tests.rs index a4db9a965..733e6cc5f 100644 --- a/codex-rs/rollout/src/compression_tests.rs +++ b/codex-rs/rollout/src/compression_tests.rs @@ -472,6 +472,7 @@ fn write_rollout(path: &std::path::Path, thread_id: ThreadId, message: &str) -> model_provider: None, base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/rollout/src/metadata_tests.rs b/codex-rs/rollout/src/metadata_tests.rs index 0744e7780..c0aea248c 100644 --- a/codex-rs/rollout/src/metadata_tests.rs +++ b/codex-rs/rollout/src/metadata_tests.rs @@ -49,6 +49,7 @@ async fn extract_metadata_from_rollout_uses_session_meta() { model_provider: Some("openai".to_string()), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, @@ -106,6 +107,7 @@ async fn extract_metadata_from_rollout_returns_latest_memory_mode() { model_provider: Some("openai".to_string()), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, @@ -375,6 +377,7 @@ fn write_rollout_in_sessions_with_cwd( model_provider: Some("test-provider".to_string()), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 3908604c7..58d5c7666 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -12,6 +12,7 @@ use std::sync::Mutex; use chrono::SecondsFormat; use codex_protocol::SessionId; use codex_protocol::ThreadId; +use codex_protocol::capabilities::SelectedCapabilityRoot; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::models::BaseInstructions; use serde_json::Value; @@ -91,6 +92,7 @@ pub enum RolloutRecorderParams { originator: String, base_instructions: BaseInstructions, dynamic_tools: Vec, + selected_capability_roots: Vec, multi_agent_version: Option, initial_window_id: Option, }, @@ -182,6 +184,7 @@ impl RolloutRecorderParams { originator, base_instructions, dynamic_tools, + selected_capability_roots: Vec::new(), multi_agent_version: None, initial_window_id: None, } @@ -194,6 +197,20 @@ impl RolloutRecorderParams { self } + pub fn with_selected_capability_roots( + mut self, + selected_capability_roots: Vec, + ) -> Self { + if let Self::Create { + selected_capability_roots: roots, + .. + } = &mut self + { + *roots = selected_capability_roots; + } + self + } + pub fn with_multi_agent_version( mut self, multi_agent_version: Option, @@ -732,6 +749,7 @@ impl RolloutRecorder { originator, base_instructions, dynamic_tools, + selected_capability_roots, multi_agent_version, initial_window_id, } => { @@ -769,6 +787,7 @@ impl RolloutRecorder { } else { Some(dynamic_tools) }, + selected_capability_roots, memory_mode: (!config.generate_memories()).then_some("disabled".to_string()), multi_agent_version, context_window: initial_window_id.map(SessionContextWindow::new), diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 4b72ff767..71ec252c2 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -101,6 +101,7 @@ async fn state_db_init_backfills_before_returning() -> anyhow::Result<()> { model_provider: None, base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/rollout/src/session_index_tests.rs b/codex-rs/rollout/src/session_index_tests.rs index 7f57c2ba0..be39fb06a 100644 --- a/codex-rs/rollout/src/session_index_tests.rs +++ b/codex-rs/rollout/src/session_index_tests.rs @@ -41,6 +41,7 @@ fn write_rollout_with_metadata(path: &Path, thread_id: ThreadId) -> std::io::Res model_provider: Some("test-provider".into()), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/rollout/src/state_db_tests.rs b/codex-rs/rollout/src/state_db_tests.rs index 5712b65ce..bb341aa85 100644 --- a/codex-rs/rollout/src/state_db_tests.rs +++ b/codex-rs/rollout/src/state_db_tests.rs @@ -174,6 +174,7 @@ fn write_rollout_with_user_message( model_provider: Some("test-provider".to_string()), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/rollout/src/tests.rs b/codex-rs/rollout/src/tests.rs index e6a3c728e..901461cf5 100644 --- a/codex-rs/rollout/src/tests.rs +++ b/codex-rs/rollout/src/tests.rs @@ -1288,6 +1288,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> { model_provider: Some("test-provider".into()), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index e5ae07839..81de0cc39 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -343,6 +343,7 @@ mod tests { model_provider: Some("openai".to_string()), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, @@ -535,6 +536,7 @@ mod tests { model_provider: Some("openai".to_string()), base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index d1c2c9076..e5ab6fb31 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -2148,6 +2148,7 @@ mod tests { model_provider: None, base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: Some("polluted".to_string()), multi_agent_version: None, context_window: None, @@ -2211,6 +2212,7 @@ mod tests { model_provider: None, base_instructions: None, dynamic_tools: None, + selected_capability_roots: Vec::new(), memory_mode: None, multi_agent_version: None, context_window: None, diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index dfed2ffac..d64510f22 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -126,6 +126,7 @@ mod tests { originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), multi_agent_version: None, initial_window_id: uuid::Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { @@ -281,6 +282,7 @@ impl InMemoryThreadStore { model_provider: Some(params.metadata.model_provider.clone()), base_instructions: Some(params.base_instructions.clone()), dynamic_tools: (!params.dynamic_tools.is_empty()).then(|| params.dynamic_tools.clone()), + selected_capability_roots: params.selected_capability_roots.clone(), memory_mode: matches!(params.metadata.memory_mode, ThreadMemoryMode::Disabled) .then_some("disabled".to_string()), multi_agent_version: params.multi_agent_version, diff --git a/codex-rs/thread-store/src/local/create_thread.rs b/codex-rs/thread-store/src/local/create_thread.rs index eba6a5a6a..451931b4d 100644 --- a/codex-rs/thread-store/src/local/create_thread.rs +++ b/codex-rs/thread-store/src/local/create_thread.rs @@ -38,6 +38,7 @@ pub(super) async fn create_thread( params.dynamic_tools, ) .with_session_id(params.session_id) + .with_selected_capability_roots(params.selected_capability_roots) .with_multi_agent_version(params.multi_agent_version) .with_initial_window_id(params.initial_window_id), ) diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index 51e622fc5..717ae09b6 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -1133,6 +1133,7 @@ mod tests { originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), multi_agent_version: None, initial_window_id: uuid::Uuid::now_v7().to_string(), metadata: thread_metadata(), diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 1f761cea9..f96a4df31 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -5,6 +5,7 @@ use chrono::DateTime; use chrono::Utc; use codex_protocol::SessionId; use codex_protocol::ThreadId; +use codex_protocol::capabilities::SelectedCapabilityRoot; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::models::BaseInstructions; use codex_protocol::models::PermissionProfile; @@ -85,6 +86,9 @@ pub struct CreateThreadParams { pub base_instructions: BaseInstructions, /// Dynamic tools available to the thread at startup. pub dynamic_tools: Vec, + /// Environment-qualified capability roots selected for this thread. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub selected_capability_roots: Vec, /// Multi-agent runtime selected when the thread was created. pub multi_agent_version: Option, /// Initial context-window identity captured when the thread was created.