From 8fa88fa8cabddb15c6477b84f2d1774ce50b20bd Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 25 Mar 2026 16:14:36 -0700 Subject: [PATCH] Add cached environment manager for exec server URL (#15785) Add environment manager that is a singleton and is created early in app-server (before skill manager, before config loading). Use an environment variable to point to a running exec server. --- codex-rs/Cargo.lock | 2 + codex-rs/app-server/src/in_process.rs | 2 + codex-rs/app-server/src/lib.rs | 3 + codex-rs/app-server/src/message_processor.rs | 4 + .../src/message_processor/tracing_tests.rs | 2 + codex-rs/core/config.schema.json | 4 - codex-rs/core/src/agent/control_tests.rs | 21 +++++ codex-rs/core/src/codex.rs | 9 +- codex-rs/core/src/codex_delegate.rs | 4 + codex-rs/core/src/codex_tests.rs | 7 +- codex-rs/core/src/codex_tests_guardian.rs | 2 + codex-rs/core/src/config/config_tests.rs | 32 ------- codex-rs/core/src/config/mod.rs | 9 -- codex-rs/core/src/memories/tests.rs | 3 + codex-rs/core/src/test_support.rs | 9 +- codex-rs/core/src/thread_manager.rs | 15 ++- codex-rs/core/src/thread_manager_tests.rs | 15 +++ codex-rs/core/tests/common/test_codex.rs | 26 +++--- codex-rs/core/tests/suite/client.rs | 3 + codex-rs/exec-server/src/environment.rs | 93 ++++++++++++++++--- codex-rs/exec-server/src/lib.rs | 2 + .../exec-server/src/remote_file_system.rs | 9 ++ codex-rs/exec-server/src/remote_process.rs | 7 ++ codex-rs/exec-server/tests/exec_process.rs | 2 +- codex-rs/exec-server/tests/file_system.rs | 2 +- codex-rs/mcp-server/Cargo.toml | 1 + codex-rs/mcp-server/src/lib.rs | 6 +- codex-rs/mcp-server/src/message_processor.rs | 5 +- codex-rs/tui/Cargo.toml | 1 + codex-rs/tui/src/app.rs | 3 + justfile | 5 + scripts/run_tui_with_exec_server.sh | 61 ++++++++++++ 32 files changed, 286 insertions(+), 83 deletions(-) create mode 100755 scripts/run_tui_with_exec_server.sh diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index fb1710987..02625d90c 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2323,6 +2323,7 @@ dependencies = [ "anyhow", "codex-arg0", "codex-core", + "codex-exec-server", "codex-features", "codex-protocol", "codex-shell-command", @@ -2724,6 +2725,7 @@ dependencies = [ "codex-client", "codex-cloud-requirements", "codex-core", + "codex-exec-server", "codex-features", "codex-feedback", "codex-file-search", diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 79f32c908..0405c7225 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -77,6 +77,7 @@ use codex_arg0::Arg0DispatchPaths; use codex_core::config::Config; use codex_core::config_loader::CloudRequirementsLoader; use codex_core::config_loader::LoaderOverrides; +use codex_exec_server::EnvironmentManager; use codex_feedback::CodexFeedback; use codex_protocol::protocol::SessionSource; use tokio::sync::mpsc; @@ -383,6 +384,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { outgoing: Arc::clone(&processor_outgoing), arg0_paths: args.arg0_paths, config: args.config, + environment_manager: Arc::new(EnvironmentManager::from_env()), cli_overrides: args.cli_overrides, loader_overrides: args.loader_overrides, cloud_requirements: args.cloud_requirements, diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 247270a26..0a2b6f386 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -40,6 +40,7 @@ use codex_core::ExecPolicyError; use codex_core::check_execpolicy_for_warnings; use codex_core::config_loader::ConfigLoadError; use codex_core::config_loader::TextRange as CoreTextRange; +use codex_exec_server::EnvironmentManager; use codex_feedback::CodexFeedback; use codex_protocol::protocol::SessionSource; use codex_state::log_db; @@ -355,6 +356,7 @@ pub async fn run_main_with_transport( session_source: SessionSource, auth: AppServerWebsocketAuthSettings, ) -> IoResult<()> { + let environment_manager = Arc::new(EnvironmentManager::from_env()); let (transport_event_tx, mut transport_event_rx) = mpsc::channel::(CHANNEL_CAPACITY); let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); @@ -620,6 +622,7 @@ pub async fn run_main_with_transport( outgoing: outgoing_message_sender, arg0_paths, config: Arc::new(config), + environment_manager, cli_overrides, loader_overrides, cloud_requirements: cloud_requirements.clone(), diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index feca70951..9a9a145f6 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -66,6 +66,7 @@ use codex_core::default_client::get_codex_user_agent; use codex_core::default_client::set_default_client_residency_requirement; use codex_core::default_client::set_default_originator; use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig; +use codex_exec_server::EnvironmentManager; use codex_features::Feature; use codex_feedback::CodexFeedback; use codex_login::auth::ExternalAuthRefreshContext; @@ -176,6 +177,7 @@ pub(crate) struct MessageProcessorArgs { pub(crate) outgoing: Arc, pub(crate) arg0_paths: Arg0DispatchPaths, pub(crate) config: Arc, + pub(crate) environment_manager: Arc, pub(crate) cli_overrides: Vec<(String, TomlValue)>, pub(crate) loader_overrides: LoaderOverrides, pub(crate) cloud_requirements: CloudRequirementsLoader, @@ -194,6 +196,7 @@ impl MessageProcessor { outgoing, arg0_paths, config, + environment_manager, cli_overrides, loader_overrides, cloud_requirements, @@ -217,6 +220,7 @@ impl MessageProcessor { .features .enabled(Feature::DefaultModeRequestUserInput), }, + environment_manager, )); auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone()); auth_manager.set_external_auth_refresher(Arc::new(ExternalAuthRefreshBridge { diff --git a/codex-rs/app-server/src/message_processor/tracing_tests.rs b/codex-rs/app-server/src/message_processor/tracing_tests.rs index aa66f79f2..d2fe9c23d 100644 --- a/codex-rs/app-server/src/message_processor/tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor/tracing_tests.rs @@ -24,6 +24,7 @@ use codex_core::config::Config; use codex_core::config::ConfigBuilder; use codex_core::config_loader::CloudRequirementsLoader; use codex_core::config_loader::LoaderOverrides; +use codex_exec_server::EnvironmentManager; use codex_feedback::CodexFeedback; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::W3cTraceContext; @@ -236,6 +237,7 @@ fn build_test_processor( outgoing, arg0_paths: Arg0DispatchPaths::default(), config, + environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)), cli_overrides: Vec::new(), loader_overrides: LoaderOverrides::default(), cloud_requirements: CloudRequirementsLoader::default(), diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 3e2d7ff68..853f0235e 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -1914,10 +1914,6 @@ "experimental_compact_prompt_file": { "$ref": "#/definitions/AbsolutePathBuf" }, - "experimental_exec_server_url": { - "description": "Experimental / do not use. Overrides the URL used when connecting to a remote exec server.", - "type": "string" - }, "experimental_realtime_start_instructions": { "description": "Experimental / do not use. Replaces the built-in realtime start instructions inserted into developer messages when realtime becomes active.", "type": "string" diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index 10e9821ae..878d7d099 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -75,6 +75,9 @@ impl AgentControlHarness { CodexAuth::from_api_key("dummy"), config.model_provider.clone(), config.codex_home.clone(), + std::sync::Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let control = manager.agent_control(); Self { @@ -811,6 +814,9 @@ async fn spawn_agent_respects_max_threads_limit() { CodexAuth::from_api_key("dummy"), config.model_provider.clone(), config.codex_home.clone(), + std::sync::Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let control = manager.agent_control(); @@ -854,6 +860,9 @@ async fn spawn_agent_releases_slot_after_shutdown() { CodexAuth::from_api_key("dummy"), config.model_provider.clone(), config.codex_home.clone(), + std::sync::Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let control = manager.agent_control(); @@ -888,6 +897,9 @@ async fn spawn_agent_limit_shared_across_clones() { CodexAuth::from_api_key("dummy"), config.model_provider.clone(), config.codex_home.clone(), + std::sync::Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let control = manager.agent_control(); let cloned = control.clone(); @@ -924,6 +936,9 @@ async fn resume_agent_respects_max_threads_limit() { CodexAuth::from_api_key("dummy"), config.model_provider.clone(), config.codex_home.clone(), + std::sync::Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let control = manager.agent_control(); @@ -971,6 +986,9 @@ async fn resume_agent_releases_slot_after_resume_failure() { CodexAuth::from_api_key("dummy"), config.model_provider.clone(), config.codex_home.clone(), + std::sync::Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let control = manager.agent_control(); @@ -1361,6 +1379,9 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() { CodexAuth::from_api_key("dummy"), config.model_provider.clone(), config.codex_home.clone(), + std::sync::Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let control = manager.agent_control(); let harness = AgentControlHarness { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ecc9d3dad..a4bdb2c94 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -57,6 +57,7 @@ use codex_analytics::build_track_events_context; use codex_app_server_protocol::McpServerElicitationRequest; use codex_app_server_protocol::McpServerElicitationRequestParams; use codex_exec_server::Environment; +use codex_exec_server::EnvironmentManager; use codex_features::FEATURES; use codex_features::Feature; use codex_features::unstable_features_warning_event; @@ -402,6 +403,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) config: Config, pub(crate) auth_manager: Arc, pub(crate) models_manager: Arc, + pub(crate) environment_manager: Arc, pub(crate) skills_manager: Arc, pub(crate) plugins_manager: Arc, pub(crate) mcp_manager: Arc, @@ -455,6 +457,7 @@ impl Codex { mut config, auth_manager, models_manager, + environment_manager, skills_manager, plugins_manager, mcp_manager, @@ -646,6 +649,7 @@ impl Codex { agent_status_tx.clone(), conversation_history, session_source_clone, + environment_manager, skills_manager, plugins_manager, mcp_manager.clone(), @@ -1450,6 +1454,7 @@ impl Session { agent_status: watch::Sender, initial_history: InitialHistory, session_source: SessionSource, + environment_manager: Arc, skills_manager: Arc, plugins_manager: Arc, mcp_manager: Arc, @@ -1884,9 +1889,7 @@ impl Session { code_mode_service: crate::tools::code_mode::CodeModeService::new( config.js_repl_node_path.clone(), ), - environment: Arc::new( - Environment::create(config.experimental_exec_server_url.clone()).await?, - ), + environment: environment_manager.current().await?, }; let js_repl = Arc::new(JsReplHandle::with_node_path( config.js_repl_node_path.clone(), diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index d87dd070d..9f491bbcb 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use async_channel::Receiver; use async_channel::Sender; use codex_async_utils::OrCancelExt; +use codex_exec_server::EnvironmentManager; use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; @@ -76,6 +77,9 @@ pub(crate) async fn run_codex_thread_interactive( config, auth_manager, models_manager, + environment_manager: Arc::new(EnvironmentManager::new( + parent_ctx.environment.exec_server_url().map(str::to_owned), + )), skills_manager: Arc::clone(&parent_session.services.skills_manager), plugins_manager: Arc::clone(&parent_session.services.plugins_manager), mcp_manager: Arc::clone(&parent_session.services.mcp_manager), diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 74aea1296..de6489946 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -2551,6 +2551,9 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { agent_status_tx, InitialHistory::New, SessionSource::Exec, + Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), skills_manager, plugins_manager, mcp_manager, @@ -2644,7 +2647,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone(), true)); let network_approval = Arc::new(NetworkApprovalService::default()); let environment = Arc::new( - codex_exec_server::Environment::create(None) + codex_exec_server::Environment::create(/*exec_server_url*/ None) .await .expect("create environment"), ); @@ -3482,7 +3485,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone(), true)); let network_approval = Arc::new(NetworkApprovalService::default()); let environment = Arc::new( - codex_exec_server::Environment::create(None) + codex_exec_server::Environment::create(/*exec_server_url*/ None) .await .expect("create environment"), ); diff --git a/codex-rs/core/src/codex_tests_guardian.rs b/codex-rs/core/src/codex_tests_guardian.rs index 54f7405be..33b8d15d1 100644 --- a/codex-rs/core/src/codex_tests_guardian.rs +++ b/codex-rs/core/src/codex_tests_guardian.rs @@ -12,6 +12,7 @@ use crate::sandboxing::SandboxPermissions; use crate::tools::context::FunctionToolOutput; use crate::turn_diff_tracker::TurnDiffTracker; use codex_app_server_protocol::ConfigLayerSource; +use codex_exec_server::EnvironmentManager; use codex_execpolicy::Decision; use codex_execpolicy::Evaluation; use codex_execpolicy::RuleMatch; @@ -437,6 +438,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { config, auth_manager, models_manager, + environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)), skills_manager, plugins_manager, mcp_manager, diff --git a/codex-rs/core/src/config/config_tests.rs b/codex-rs/core/src/config/config_tests.rs index f67c53234..34c41d44b 100644 --- a/codex-rs/core/src/config/config_tests.rs +++ b/codex-rs/core/src/config/config_tests.rs @@ -4335,7 +4335,6 @@ fn test_precedence_fixture_with_o3_profile() -> std::io::Result<()> { model_verbosity: None, personality: Some(Personality::Pragmatic), chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), - experimental_exec_server_url: None, realtime_audio: RealtimeAudioConfig::default(), experimental_realtime_start_instructions: None, experimental_realtime_ws_base_url: None, @@ -4478,7 +4477,6 @@ fn test_precedence_fixture_with_gpt3_profile() -> std::io::Result<()> { model_verbosity: None, personality: Some(Personality::Pragmatic), chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), - experimental_exec_server_url: None, realtime_audio: RealtimeAudioConfig::default(), experimental_realtime_start_instructions: None, experimental_realtime_ws_base_url: None, @@ -4619,7 +4617,6 @@ fn test_precedence_fixture_with_zdr_profile() -> std::io::Result<()> { model_verbosity: None, personality: Some(Personality::Pragmatic), chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), - experimental_exec_server_url: None, realtime_audio: RealtimeAudioConfig::default(), experimental_realtime_start_instructions: None, experimental_realtime_ws_base_url: None, @@ -4746,7 +4743,6 @@ fn test_precedence_fixture_with_gpt5_profile() -> std::io::Result<()> { model_verbosity: Some(Verbosity::High), personality: Some(Personality::Pragmatic), chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), - experimental_exec_server_url: None, realtime_audio: RealtimeAudioConfig::default(), experimental_realtime_start_instructions: None, experimental_realtime_ws_base_url: None, @@ -5964,34 +5960,6 @@ experimental_realtime_start_instructions = "start instructions from config" Ok(()) } -#[test] -fn experimental_exec_server_url_loads_from_config_toml() -> std::io::Result<()> { - let cfg: ConfigToml = toml::from_str( - r#" -experimental_exec_server_url = "http://127.0.0.1:8080" -"#, - ) - .expect("TOML deserialization should succeed"); - - assert_eq!( - cfg.experimental_exec_server_url.as_deref(), - Some("http://127.0.0.1:8080") - ); - - let codex_home = TempDir::new()?; - let config = Config::load_from_base_config_with_overrides( - cfg, - ConfigOverrides::default(), - codex_home.path().to_path_buf(), - )?; - - assert_eq!( - config.experimental_exec_server_url.as_deref(), - Some("http://127.0.0.1:8080") - ); - Ok(()) -} - #[test] fn experimental_realtime_ws_base_url_loads_from_config_toml() -> std::io::Result<()> { let cfg: ConfigToml = toml::from_str( diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index e799f1b9b..761e43b93 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -526,10 +526,6 @@ pub struct Config { /// Base URL for requests to ChatGPT (as opposed to the OpenAI API). pub chatgpt_base_url: String, - /// Experimental / do not use. Overrides the URL used when connecting to - /// a remote exec server. - pub experimental_exec_server_url: Option, - /// Machine-local realtime audio device preferences used by realtime voice. pub realtime_audio: RealtimeAudioConfig, @@ -1319,10 +1315,6 @@ pub struct ConfigToml { /// Base URL override for the built-in `openai` model provider. pub openai_base_url: Option, - /// Experimental / do not use. Overrides the URL used when connecting to - /// a remote exec server. - pub experimental_exec_server_url: Option, - /// Machine-local realtime audio device preferences used by realtime voice. #[serde(default)] pub audio: Option, @@ -2678,7 +2670,6 @@ impl Config { .chatgpt_base_url .or(cfg.chatgpt_base_url) .unwrap_or("https://chatgpt.com/backend-api/".to_string()), - experimental_exec_server_url: cfg.experimental_exec_server_url, realtime_audio: cfg .audio .map_or_else(RealtimeAudioConfig::default, |audio| RealtimeAudioConfig { diff --git a/codex-rs/core/src/memories/tests.rs b/codex-rs/core/src/memories/tests.rs index a929401b9..03dbe7185 100644 --- a/codex-rs/core/src/memories/tests.rs +++ b/codex-rs/core/src/memories/tests.rs @@ -484,6 +484,9 @@ mod phase2 { CodexAuth::from_api_key("dummy"), config.model_provider.clone(), config.codex_home.clone(), + std::sync::Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let (mut session, _turn_context) = make_session_and_context().await; session.services.state_db = Some(Arc::clone(&state_db)); diff --git a/codex-rs/core/src/test_support.rs b/codex-rs/core/src/test_support.rs index c2aad83df..075395e3a 100644 --- a/codex-rs/core/src/test_support.rs +++ b/codex-rs/core/src/test_support.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use std::sync::Arc; +use codex_exec_server::EnvironmentManager; use codex_protocol::config_types::CollaborationModeMask; use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ModelPreset; @@ -60,8 +61,14 @@ pub fn thread_manager_with_models_provider_and_home( auth: CodexAuth, provider: ModelProviderInfo, codex_home: PathBuf, + environment_manager: Arc, ) -> ThreadManager { - ThreadManager::with_models_provider_and_home_for_tests(auth, provider, codex_home) + ThreadManager::with_models_provider_and_home_for_tests( + auth, + provider, + codex_home, + environment_manager, + ) } pub async fn start_thread_with_user_shell_override( diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 511a6fc97..7f272f959 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -28,6 +28,7 @@ use crate::skills_watcher::SkillsWatcherEvent; use crate::tasks::interrupted_turn_history_marker; use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::TurnStatus; +use codex_exec_server::EnvironmentManager; use codex_protocol::ThreadId; use codex_protocol::config_types::CollaborationModeMask; #[cfg(test)] @@ -200,6 +201,7 @@ pub(crate) struct ThreadManagerState { thread_created_tx: broadcast::Sender, auth_manager: Arc, models_manager: Arc, + environment_manager: Arc, skills_manager: Arc, plugins_manager: Arc, mcp_manager: Arc, @@ -215,6 +217,7 @@ impl ThreadManager { auth_manager: Arc, session_source: SessionSource, collaboration_modes_config: CollaborationModesConfig, + environment_manager: Arc, ) -> Self { let codex_home = config.codex_home.clone(); let restriction_product = session_source.restriction_product(); @@ -246,6 +249,7 @@ impl ThreadManager { collaboration_modes_config, openai_models_provider, )), + environment_manager, skills_manager, plugins_manager, mcp_manager, @@ -272,8 +276,12 @@ impl ThreadManager { )); std::fs::create_dir_all(&codex_home) .unwrap_or_else(|err| panic!("temp codex home dir create failed: {err}")); - let mut manager = - Self::with_models_provider_and_home_for_tests(auth, provider, codex_home.clone()); + let mut manager = Self::with_models_provider_and_home_for_tests( + auth, + provider, + codex_home.clone(), + Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)), + ); manager._test_codex_home_guard = Some(TempCodexHomeGuard { path: codex_home }); manager } @@ -284,6 +292,7 @@ impl ThreadManager { auth: CodexAuth, provider: ModelProviderInfo, codex_home: PathBuf, + environment_manager: Arc, ) -> Self { set_thread_manager_test_mode_for_tests(/*enabled*/ true); let auth_manager = AuthManager::from_auth_for_testing(auth); @@ -309,6 +318,7 @@ impl ThreadManager { auth_manager.clone(), provider, )), + environment_manager, skills_manager, plugins_manager, mcp_manager, @@ -844,6 +854,7 @@ impl ThreadManagerState { config, auth_manager, models_manager: Arc::clone(&self.models_manager), + environment_manager: Arc::clone(&self.environment_manager), skills_manager: Arc::clone(&self.skills_manager), plugins_manager: Arc::clone(&self.plugins_manager), mcp_manager: Arc::clone(&self.mcp_manager), diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 64830fa72..c80f5f743 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -244,6 +244,9 @@ async fn shutdown_all_threads_bounded_submits_shutdown_to_every_thread() { CodexAuth::from_api_key("dummy"), config.model_provider.clone(), config.codex_home.clone(), + Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let thread_1 = manager .start_thread(config.clone()) @@ -292,6 +295,9 @@ async fn new_uses_configured_openai_provider_for_model_refresh() { auth_manager, SessionSource::Exec, CollaborationModesConfig::default(), + Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let _ = manager.list_models(RefreshStrategy::Online).await; @@ -419,6 +425,9 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor auth_manager.clone(), SessionSource::Exec, CollaborationModesConfig::default(), + Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let source = manager @@ -516,6 +525,9 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() { auth_manager.clone(), SessionSource::Exec, CollaborationModesConfig::default(), + Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let source = manager @@ -602,6 +614,9 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_ auth_manager.clone(), SessionSource::Exec, CollaborationModesConfig::default(), + Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let source = manager diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index 7b0d41278..5ed5e8a73 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -105,8 +105,7 @@ impl TestEnv { pub async fn local() -> Result { let local_cwd_temp_dir = TempDir::new()?; let cwd = local_cwd_temp_dir.path().to_path_buf(); - let environment = - codex_exec_server::Environment::create(/*experimental_exec_server_url*/ None).await?; + let environment = codex_exec_server::Environment::create(/*exec_server_url*/ None).await?; Ok(Self { environment, cwd, @@ -119,8 +118,8 @@ impl TestEnv { &self.environment } - pub fn experimental_exec_server_url(&self) -> Option<&str> { - self.environment.experimental_exec_server_url() + pub fn exec_server_url(&self) -> Option<&str> { + self.environment.exec_server_url() } } @@ -390,17 +389,17 @@ impl TestCodexBuilder { server: &wiremock::MockServer, ) -> anyhow::Result { let test_env = test_env().await?; - let experimental_exec_server_url = - test_env.experimental_exec_server_url().map(str::to_owned); + let home = match self.home.clone() { + Some(home) => home, + None => Arc::new(TempDir::new()?), + }; + let base_url = format!("{}/v1", server.uri()); let cwd = test_env.cwd.to_path_buf(); self.config_mutators.push(Box::new(move |config| { - config.experimental_exec_server_url = experimental_exec_server_url; config.cwd = cwd.abs(); })); - - let mut test = self.build(server).await?; - test._test_env = test_env; - Ok(test) + let (config, cwd) = self.prepare_config(base_url, &home).await?; + Box::pin(self.build_from_config(config, cwd, home, /*resume_from*/ None, test_env)).await } pub async fn build_with_streaming_server( @@ -479,18 +478,23 @@ impl TestCodexBuilder { test_env: TestEnv, ) -> anyhow::Result { let auth = self.auth.clone(); + let environment_manager = Arc::new(codex_exec_server::EnvironmentManager::new( + test_env.exec_server_url().map(str::to_owned), + )); let thread_manager = if config.model_catalog.is_some() { ThreadManager::new( &config, codex_core::test_support::auth_manager_from_auth(auth.clone()), SessionSource::Exec, CollaborationModesConfig::default(), + Arc::clone(&environment_manager), ) } else { codex_core::test_support::thread_manager_with_models_provider_and_home( auth.clone(), config.model_provider.clone(), config.codex_home.clone(), + Arc::clone(&environment_manager), ) }; let thread_manager = Arc::new(thread_manager); diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 869acc49f..e0e265666 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -829,6 +829,9 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() { .features .enabled(Feature::DefaultModeRequestUserInput), }, + Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), ); let NewThread { thread: codex, .. } = thread_manager .start_thread(config) diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 7cc3f7840..e6f4d540b 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use tokio::sync::OnceCell; + use crate::ExecServerClient; use crate::ExecServerError; use crate::RemoteExecServerConnectArgs; @@ -10,13 +12,49 @@ use crate::process::ExecProcess; use crate::remote_file_system::RemoteFileSystem; use crate::remote_process::RemoteProcess; +pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL"; + pub trait ExecutorEnvironment: Send + Sync { fn get_executor(&self) -> Arc; } +#[derive(Debug, Default)] +pub struct EnvironmentManager { + exec_server_url: Option, + current_environment: OnceCell>, +} + +impl EnvironmentManager { + pub fn new(exec_server_url: Option) -> Self { + Self { + exec_server_url: normalize_exec_server_url(exec_server_url), + current_environment: OnceCell::new(), + } + } + + pub fn from_env() -> Self { + Self::new(std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok()) + } + + pub fn exec_server_url(&self) -> Option<&str> { + self.exec_server_url.as_deref() + } + + pub async fn current(&self) -> Result, ExecServerError> { + self.current_environment + .get_or_try_init(|| async { + Ok(Arc::new( + Environment::create(self.exec_server_url.clone()).await?, + )) + }) + .await + .map(Arc::clone) + } +} + #[derive(Clone)] pub struct Environment { - experimental_exec_server_url: Option, + exec_server_url: Option, remote_exec_server_client: Option, executor: Arc, } @@ -32,7 +70,7 @@ impl Default for Environment { } Self { - experimental_exec_server_url: None, + exec_server_url: None, remote_exec_server_client: None, executor: Arc::new(local_process), } @@ -42,19 +80,15 @@ impl Default for Environment { impl std::fmt::Debug for Environment { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Environment") - .field( - "experimental_exec_server_url", - &self.experimental_exec_server_url, - ) + .field("exec_server_url", &self.exec_server_url) .finish_non_exhaustive() } } impl Environment { - pub async fn create( - experimental_exec_server_url: Option, - ) -> Result { - let remote_exec_server_client = if let Some(url) = &experimental_exec_server_url { + pub async fn create(exec_server_url: Option) -> Result { + let exec_server_url = normalize_exec_server_url(exec_server_url); + let remote_exec_server_client = if let Some(url) = &exec_server_url { Some( ExecServerClient::connect_websocket(RemoteExecServerConnectArgs { websocket_url: url.clone(), @@ -83,14 +117,14 @@ impl Environment { }; Ok(Self { - experimental_exec_server_url, + exec_server_url, remote_exec_server_client, executor, }) } - pub fn experimental_exec_server_url(&self) -> Option<&str> { - self.experimental_exec_server_url.as_deref() + pub fn exec_server_url(&self) -> Option<&str> { + self.exec_server_url.as_deref() } pub fn get_executor(&self) -> Arc { @@ -106,6 +140,13 @@ impl Environment { } } +fn normalize_exec_server_url(exec_server_url: Option) -> Option { + exec_server_url.and_then(|url| { + let url = url.trim(); + (!url.is_empty()).then(|| url.to_string()) + }) +} + impl ExecutorEnvironment for Environment { fn get_executor(&self) -> Arc { Arc::clone(&self.executor) @@ -114,17 +155,39 @@ impl ExecutorEnvironment for Environment { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::Environment; + use super::EnvironmentManager; use pretty_assertions::assert_eq; #[tokio::test] async fn create_without_remote_exec_server_url_does_not_connect() { - let environment = Environment::create(None).await.expect("create environment"); + let environment = Environment::create(/*exec_server_url*/ None) + .await + .expect("create environment"); - assert_eq!(environment.experimental_exec_server_url(), None); + assert_eq!(environment.exec_server_url(), None); assert!(environment.remote_exec_server_client.is_none()); } + #[test] + fn environment_manager_normalizes_empty_url() { + let manager = EnvironmentManager::new(Some(String::new())); + + assert_eq!(manager.exec_server_url(), None); + } + + #[tokio::test] + async fn environment_manager_current_caches_environment() { + let manager = EnvironmentManager::new(/*exec_server_url*/ None); + + let first = manager.current().await.expect("get current environment"); + let second = manager.current().await.expect("get current environment"); + + assert!(Arc::ptr_eq(&first, &second)); + } + #[tokio::test] async fn default_environment_has_ready_local_executor() { let environment = Environment::default(); diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 68ff9f654..8a723041c 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -30,7 +30,9 @@ pub use codex_app_server_protocol::FsRemoveParams; pub use codex_app_server_protocol::FsRemoveResponse; pub use codex_app_server_protocol::FsWriteFileParams; pub use codex_app_server_protocol::FsWriteFileResponse; +pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR; pub use environment::Environment; +pub use environment::EnvironmentManager; pub use environment::ExecutorEnvironment; pub use file_system::CopyOptions; pub use file_system::CreateDirectoryOptions; diff --git a/codex-rs/exec-server/src/remote_file_system.rs b/codex-rs/exec-server/src/remote_file_system.rs index 9711f43e5..27e4fee91 100644 --- a/codex-rs/exec-server/src/remote_file_system.rs +++ b/codex-rs/exec-server/src/remote_file_system.rs @@ -10,6 +10,7 @@ use codex_app_server_protocol::FsRemoveParams; use codex_app_server_protocol::FsWriteFileParams; use codex_utils_absolute_path::AbsolutePathBuf; use tokio::io; +use tracing::trace; use crate::CopyOptions; use crate::CreateDirectoryOptions; @@ -30,6 +31,7 @@ pub(crate) struct RemoteFileSystem { impl RemoteFileSystem { pub(crate) fn new(client: ExecServerClient) -> Self { + trace!("remote fs new"); Self { client } } } @@ -37,6 +39,7 @@ impl RemoteFileSystem { #[async_trait] impl ExecutorFileSystem for RemoteFileSystem { async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult> { + trace!("remote fs read_file"); let response = self .client .fs_read_file(FsReadFileParams { path: path.clone() }) @@ -51,6 +54,7 @@ impl ExecutorFileSystem for RemoteFileSystem { } async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec) -> FileSystemResult<()> { + trace!("remote fs write_file"); self.client .fs_write_file(FsWriteFileParams { path: path.clone(), @@ -66,6 +70,7 @@ impl ExecutorFileSystem for RemoteFileSystem { path: &AbsolutePathBuf, options: CreateDirectoryOptions, ) -> FileSystemResult<()> { + trace!("remote fs create_directory"); self.client .fs_create_directory(FsCreateDirectoryParams { path: path.clone(), @@ -77,6 +82,7 @@ impl ExecutorFileSystem for RemoteFileSystem { } async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult { + trace!("remote fs get_metadata"); let response = self .client .fs_get_metadata(FsGetMetadataParams { path: path.clone() }) @@ -94,6 +100,7 @@ impl ExecutorFileSystem for RemoteFileSystem { &self, path: &AbsolutePathBuf, ) -> FileSystemResult> { + trace!("remote fs read_directory"); let response = self .client .fs_read_directory(FsReadDirectoryParams { path: path.clone() }) @@ -111,6 +118,7 @@ impl ExecutorFileSystem for RemoteFileSystem { } async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()> { + trace!("remote fs remove"); self.client .fs_remove(FsRemoveParams { path: path.clone(), @@ -128,6 +136,7 @@ impl ExecutorFileSystem for RemoteFileSystem { destination_path: &AbsolutePathBuf, options: CopyOptions, ) -> FileSystemResult<()> { + trace!("remote fs copy"); self.client .fs_copy(FsCopyParams { source_path: source_path.clone(), diff --git a/codex-rs/exec-server/src/remote_process.rs b/codex-rs/exec-server/src/remote_process.rs index c34c1fe6a..f6e8109e4 100644 --- a/codex-rs/exec-server/src/remote_process.rs +++ b/codex-rs/exec-server/src/remote_process.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use tokio::sync::broadcast; +use tracing::trace; use crate::ExecProcess; use crate::ExecServerClient; @@ -19,6 +20,7 @@ pub(crate) struct RemoteProcess { impl RemoteProcess { pub(crate) fn new(client: ExecServerClient) -> Self { + trace!("remote process new"); Self { client } } } @@ -26,10 +28,12 @@ impl RemoteProcess { #[async_trait] impl ExecProcess for RemoteProcess { async fn start(&self, params: ExecParams) -> Result { + trace!("remote process start"); self.client.exec(params).await } async fn read(&self, params: ReadParams) -> Result { + trace!("remote process read"); self.client.read(params).await } @@ -38,14 +42,17 @@ impl ExecProcess for RemoteProcess { process_id: &str, chunk: Vec, ) -> Result { + trace!("remote process write"); self.client.write(process_id, chunk).await } async fn terminate(&self, process_id: &str) -> Result { + trace!("remote process terminate"); self.client.terminate(process_id).await } fn subscribe_events(&self) -> broadcast::Receiver { + trace!("remote process subscribe_events"); self.client.event_receiver() } } diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index d72f83b95..954c73103 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -30,7 +30,7 @@ async fn create_process_context(use_remote: bool) -> Result { _server: Some(server), }) } else { - let environment = Environment::create(None).await?; + let environment = Environment::create(/*exec_server_url*/ None).await?; Ok(ProcessContext { process: environment.get_executor(), _server: None, diff --git a/codex-rs/exec-server/tests/file_system.rs b/codex-rs/exec-server/tests/file_system.rs index ed90d7aa9..dea47e8fc 100644 --- a/codex-rs/exec-server/tests/file_system.rs +++ b/codex-rs/exec-server/tests/file_system.rs @@ -36,7 +36,7 @@ async fn create_file_system_context(use_remote: bool) -> Result IoResult<()> { + let environment_manager = Arc::new(EnvironmentManager::from_env()); // Parse CLI overrides once and derive the base Config eagerly so later // components do not need to work with raw TOML values. let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| { @@ -127,7 +130,8 @@ pub async fn run_main( let mut processor = MessageProcessor::new( outgoing_message_sender, arg0_paths, - std::sync::Arc::new(config), + Arc::new(config), + environment_manager, ); async move { while let Some(msg) = incoming_rx.recv().await { diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index e5397e4ca..1fba41164 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use codex_arg0::Arg0DispatchPaths; use codex_core::AuthManager; @@ -7,6 +8,7 @@ use codex_core::config::Config; use codex_core::default_client::USER_AGENT_SUFFIX; use codex_core::default_client::get_codex_user_agent; use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig; +use codex_exec_server::EnvironmentManager; use codex_features::Feature; use codex_protocol::ThreadId; use codex_protocol::protocol::SessionSource; @@ -27,7 +29,6 @@ use rmcp::model::RequestId; use rmcp::model::ServerCapabilities; use rmcp::model::ToolsCapability; use serde_json::json; -use std::sync::Arc; use tokio::sync::Mutex; use tokio::task; @@ -52,6 +53,7 @@ impl MessageProcessor { outgoing: OutgoingMessageSender, arg0_paths: Arg0DispatchPaths, config: Arc, + environment_manager: Arc, ) -> Self { let outgoing = Arc::new(outgoing); let auth_manager = AuthManager::shared( @@ -68,6 +70,7 @@ impl MessageProcessor { .features .enabled(Feature::DefaultModeRequestUserInput), }, + environment_manager, )); Self { outgoing, diff --git a/codex-rs/tui/Cargo.toml b/codex-rs/tui/Cargo.toml index 01a9c27a7..4613ddcf8 100644 --- a/codex-rs/tui/Cargo.toml +++ b/codex-rs/tui/Cargo.toml @@ -37,6 +37,7 @@ codex-chatgpt = { workspace = true } codex-client = { workspace = true } codex-cloud-requirements = { workspace = true } codex-core = { workspace = true } +codex-exec-server = { workspace = true } codex-features = { workspace = true } codex-feedback = { workspace = true } codex-file-search = { workspace = true } diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index a60652027..fe1cb6ae0 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -77,6 +77,7 @@ use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_ use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG; #[cfg(target_os = "windows")] use codex_core::windows_sandbox::WindowsSandboxLevelExt; +use codex_exec_server::EnvironmentManager; use codex_features::Feature; use codex_otel::SessionTelemetry; use codex_otel::TelemetryAuthMode; @@ -2356,6 +2357,7 @@ impl App { let harness_overrides = normalize_harness_overrides_for_cwd(harness_overrides, &config.cwd)?; + let environment_manager = Arc::new(EnvironmentManager::from_env()); let thread_manager = Arc::new(ThreadManager::new( &config, auth_manager.clone(), @@ -2365,6 +2367,7 @@ impl App { .features .enabled(Feature::DefaultModeRequestUserInput), }, + environment_manager, )); let mut model = thread_manager .get_models_manager() diff --git a/justfile b/justfile index 768b71407..5c9fa5e6a 100644 --- a/justfile +++ b/justfile @@ -14,6 +14,11 @@ codex *args: exec *args: cargo run --bin codex -- exec "$@" +# Start codex-exec-server, enable the app-server TUI, and run codex-tui. +[no-cd] +tui-with-exec-server *args: + ./scripts/run_tui_with_exec_server.sh "$@" + # Run the CLI version of the file-search crate. file-search *args: cargo run --bin codex-file-search -- "$@" diff --git a/scripts/run_tui_with_exec_server.sh b/scripts/run_tui_with_exec_server.sh new file mode 100755 index 000000000..8f2fa8013 --- /dev/null +++ b/scripts/run_tui_with_exec_server.sh @@ -0,0 +1,61 @@ +#!/usr/bin/env bash + +set -euo pipefail + +repo_root="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cargo_root="$repo_root/codex-rs" +listen_url="${CODEX_EXEC_SERVER_LISTEN_URL:-ws://127.0.0.1:0}" +start_timeout_seconds="${CODEX_EXEC_SERVER_START_TIMEOUT_SECONDS:-120}" +tmp_dir="$(mktemp -d "${TMPDIR:-/tmp}/codex-tui-with-exec-server.XXXXXX")" +stdout_log="$tmp_dir/exec-server.stdout" +stderr_log="$tmp_dir/exec-server.stderr" +server_pid="" +exec_server_url="" + +cleanup() { + if [[ -n "$server_pid" ]]; then + kill "$server_pid" >/dev/null 2>&1 || true + wait "$server_pid" >/dev/null 2>&1 || true + fi + rm -rf "$tmp_dir" +} + +trap cleanup EXIT INT TERM HUP + +( + cd "$cargo_root" + cargo run -p codex-exec-server -- --listen "$listen_url" +) >"$stdout_log" 2>"$stderr_log" & +server_pid="$!" + +# Wait for the server to print its bound websocket URL before launching the TUI. +for _ in $(seq 1 "$((start_timeout_seconds * 20))"); do + if [[ -s "$stdout_log" ]]; then + exec_server_url="$(head -n 1 "$stdout_log" | tr -d '\r')" + if [[ "$exec_server_url" == ws://* ]]; then + break + fi + fi + + if ! kill -0 "$server_pid" >/dev/null 2>&1; then + cat "$stderr_log" >&2 || true + cat "$stdout_log" >&2 || true + echo "failed to start codex-exec-server" >&2 + exit 1 + fi + + sleep 0.05 +done + +if [[ -z "$exec_server_url" ]]; then + cat "$stderr_log" >&2 || true + cat "$stdout_log" >&2 || true + echo "timed out waiting ${start_timeout_seconds}s for codex-exec-server to report its websocket URL" >&2 + exit 1 +fi + +export CODEX_EXEC_SERVER_URL="$exec_server_url" +echo "Starting codex-tui with CODEX_EXEC_SERVER_URL=$CODEX_EXEC_SERVER_URL" >&2 + +cd "$cargo_root" +cargo run -p codex-tui --bin codex-tui -- -c features.tui_app_server=true -c mcp_oauth_credentials_store=file "$@"