diff --git a/codex-rs/core/src/context/environment_context.rs b/codex-rs/core/src/context/environment_context.rs index e93ffb88f..5f36124b8 100644 --- a/codex-rs/core/src/context/environment_context.rs +++ b/codex-rs/core/src/context/environment_context.rs @@ -1,3 +1,4 @@ +use crate::session::step_context::StepContext; use crate::session::turn_context::TurnContext; use crate::session::turn_context::TurnEnvironment; use crate::shell::Shell; @@ -30,34 +31,50 @@ pub(crate) struct EnvironmentContext { pub(crate) struct EnvironmentContextEnvironment { pub(crate) id: String, pub(crate) cwd: PathUri, - pub(crate) shell: String, + status: EnvironmentContextEnvironmentStatus, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum EnvironmentContextEnvironmentStatus { + Starting, + Ready { shell: String }, } impl EnvironmentContextEnvironment { - fn legacy(cwd: PathUri, shell: String) -> Self { - Self { - id: String::new(), - cwd, - shell, - } - } - fn from_turn_environments(environments: &[TurnEnvironment], shell: &Shell) -> Vec { environments .iter() .map(|environment| Self { id: environment.environment_id.clone(), cwd: environment.cwd().clone(), - shell: environment - .shell - .as_ref() - .map(|shell| shell.name().to_string()) - .unwrap_or_else(|| shell.name().to_string()), + status: EnvironmentContextEnvironmentStatus::Ready { + shell: environment + .shell + .as_ref() + .map(|shell| shell.name().to_string()) + .unwrap_or_else(|| shell.name().to_string()), + }, }) .collect() } } +impl EnvironmentContextEnvironmentStatus { + fn equals_except_shell(&self, other: &Self) -> bool { + matches!( + (self, other), + (Self::Starting, Self::Starting) | (Self::Ready { .. }, Self::Ready { .. }) + ) + } + + fn render(&self, indent: &str) -> String { + match self { + Self::Starting => format!("{indent}starting"), + Self::Ready { shell } => format!("{indent}{shell}"), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum EnvironmentContextEnvironments { None, @@ -81,13 +98,16 @@ impl EnvironmentContextEnvironments { fn equals_except_shell(&self, other: &Self) -> bool { match (self, other) { (Self::None, Self::None) => true, - (Self::Single(left), Self::Single(right)) => left.cwd == right.cwd, + (Self::Single(left), Self::Single(right)) => { + left.cwd == right.cwd && left.status.equals_except_shell(&right.status) + } (Self::Multiple(left), Self::Multiple(right)) => { left.len() == right.len() - && left - .iter() - .zip(right.iter()) - .all(|(left, right)| left.id == right.id && left.cwd == right.cwd) + && left.iter().zip(right.iter()).all(|(left, right)| { + left.id == right.id + && left.cwd == right.cwd + && left.status.equals_except_shell(&right.status) + }) } _ => false, } @@ -365,6 +385,48 @@ impl EnvironmentContext { } } + pub(crate) fn from_step_context(step_context: &StepContext, shell: &Shell) -> Option { + let mut environments = EnvironmentContextEnvironment::from_turn_environments( + &step_context.environments.turn_environments, + shell, + ); + environments.extend( + step_context + .environments + .starting + .iter() + .map(|environment| EnvironmentContextEnvironment { + id: environment.selection.environment_id.clone(), + cwd: environment.selection.cwd.clone(), + status: EnvironmentContextEnvironmentStatus::Starting, + }), + ); + + Self::environment_only(environments) + } + + pub(crate) fn from_attached_environments( + environments: &[TurnEnvironment], + shell: &Shell, + ) -> Option { + Self::environment_only(EnvironmentContextEnvironment::from_turn_environments( + environments, + shell, + )) + } + + fn environment_only(environments: Vec) -> Option { + (!environments.is_empty()).then(|| { + Self::new( + environments, + /*current_date*/ None, + /*timezone*/ None, + /*network*/ None, + /*subagents*/ None, + ) + }) + } + /// Compares two environment contexts, ignoring the shell. Useful when /// comparing turn to turn, since the initial environment_context will /// include the shell, and then it is not configurable from turn to turn. @@ -386,10 +448,11 @@ impl EnvironmentContext { let environments = match &after.environments { EnvironmentContextEnvironments::Single(environment) => { if PathUri::from_abs_path(&before.cwd) != environment.cwd { - EnvironmentContextEnvironments::Single(EnvironmentContextEnvironment::legacy( - environment.cwd.clone(), - environment.shell.clone(), - )) + EnvironmentContextEnvironments::Single(EnvironmentContextEnvironment { + id: String::new(), + cwd: environment.cwd.clone(), + status: environment.status.clone(), + }) } else { EnvironmentContextEnvironments::None } @@ -442,10 +505,11 @@ impl EnvironmentContext { shell: String, ) -> Self { Self::new_with_environments( - EnvironmentContextEnvironments::from_vec(vec![EnvironmentContextEnvironment::legacy( - PathUri::from_abs_path(&turn_context_item.cwd), - shell, - )]), + EnvironmentContextEnvironments::from_vec(vec![EnvironmentContextEnvironment { + id: String::new(), + cwd: PathUri::from_abs_path(&turn_context_item.cwd), + status: EnvironmentContextEnvironmentStatus::Ready { shell }, + }]), turn_context_item.current_date.clone(), turn_context_item.timezone.clone(), Self::network_from_turn_context_item(turn_context_item), @@ -538,7 +602,7 @@ impl ContextualUserFragment for EnvironmentContext { EnvironmentContextEnvironments::Single(environment) => { let cwd = environment.cwd.inferred_native_path_string(); lines.push(format!(" {cwd}")); - lines.push(format!(" {}", environment.shell)); + lines.push(environment.status.render(" ")); } EnvironmentContextEnvironments::Multiple(environments) => { lines.push(" ".to_string()); @@ -546,7 +610,7 @@ impl ContextualUserFragment for EnvironmentContext { lines.push(format!(" ", environment.id)); let cwd = environment.cwd.inferred_native_path_string(); lines.push(format!(" {cwd}")); - lines.push(format!(" {}", environment.shell)); + lines.push(environment.status.render(" ")); lines.push(" ".to_string()); } lines.push(" ".to_string()); diff --git a/codex-rs/core/src/context/environment_context_tests.rs b/codex-rs/core/src/context/environment_context_tests.rs index d40ce4e67..4d9f1a729 100644 --- a/codex-rs/core/src/context/environment_context_tests.rs +++ b/codex-rs/core/src/context/environment_context_tests.rs @@ -1,5 +1,6 @@ use crate::shell::ShellType; +use super::EnvironmentContextEnvironmentStatus::Ready; use super::*; use codex_protocol::models::PermissionProfile; use codex_protocol::permissions::FileSystemAccessMode; @@ -37,7 +38,9 @@ fn serialize_workspace_write_environment_context() { vec![EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&cwd.abs()), - shell: fake_shell_name(), + status: Ready { + shell: fake_shell_name(), + }, }], Some("2026-02-26".to_string()), Some("America/Los_Angeles".to_string()), @@ -64,7 +67,9 @@ fn serialize_environment_context_with_foreign_windows_cwd() { vec![EnvironmentContextEnvironment { id: "remote".to_string(), cwd: PathUri::parse("file:///C:/windows").expect("Windows cwd URI"), - shell: "powershell".to_string(), + status: Ready { + shell: "powershell".to_string(), + }, }], /*current_date*/ None, /*timezone*/ None, @@ -91,7 +96,9 @@ fn serialize_environment_context_with_network() { vec![EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&test_abs_path("/repo")), - shell: fake_shell_name(), + status: Ready { + shell: fake_shell_name(), + }, }], Some("2026-02-26".to_string()), Some("America/Los_Angeles".to_string()), @@ -153,7 +160,9 @@ fn serialize_environment_context_with_full_filesystem_profile() { vec![EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&test_abs_path("/repo")), - shell: fake_shell_name(), + status: Ready { + shell: fake_shell_name(), + }, }], /*current_date*/ None, /*timezone*/ None, @@ -259,7 +268,9 @@ fn equals_except_shell_compares_cwd() { vec![EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&test_abs_path("/repo")), - shell: fake_shell_name(), + status: Ready { + shell: fake_shell_name(), + }, }], /*current_date*/ None, /*timezone*/ None, @@ -270,7 +281,9 @@ fn equals_except_shell_compares_cwd() { vec![EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&test_abs_path("/repo")), - shell: fake_shell_name(), + status: Ready { + shell: fake_shell_name(), + }, }], /*current_date*/ None, /*timezone*/ None, @@ -286,7 +299,9 @@ fn equals_except_shell_compares_cwd_differences() { vec![EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&test_abs_path("/repo1")), - shell: fake_shell_name(), + status: Ready { + shell: fake_shell_name(), + }, }], /*current_date*/ None, /*timezone*/ None, @@ -297,7 +312,9 @@ fn equals_except_shell_compares_cwd_differences() { vec![EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&test_abs_path("/repo2")), - shell: fake_shell_name(), + status: Ready { + shell: fake_shell_name(), + }, }], /*current_date*/ None, /*timezone*/ None, @@ -314,7 +331,9 @@ fn equals_except_shell_ignores_shell() { vec![EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&test_abs_path("/repo")), - shell: "bash".to_string(), + status: Ready { + shell: "bash".to_string(), + }, }], /*current_date*/ None, /*timezone*/ None, @@ -325,7 +344,9 @@ fn equals_except_shell_ignores_shell() { vec![EnvironmentContextEnvironment { id: "other".to_string(), cwd: PathUri::from_abs_path(&test_abs_path("/repo")), - shell: "zsh".to_string(), + status: Ready { + shell: "zsh".to_string(), + }, }], /*current_date*/ None, /*timezone*/ None, @@ -342,7 +363,9 @@ fn serialize_environment_context_with_subagents() { vec![EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&test_abs_path("/repo")), - shell: fake_shell_name(), + status: Ready { + shell: fake_shell_name(), + }, }], Some("2026-02-26".to_string()), Some("America/Los_Angeles".to_string()), @@ -376,12 +399,16 @@ fn serialize_environment_context_with_multiple_selected_environments() { EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&local_cwd.abs()), - shell: "bash".to_string(), + status: Ready { + shell: "bash".to_string(), + }, }, EnvironmentContextEnvironment { id: "remote".to_string(), cwd: PathUri::from_abs_path(&remote_cwd.abs()), - shell: "bash".to_string(), + status: Ready { + shell: "bash".to_string(), + }, }, ], Some("2026-02-26".to_string()), @@ -421,12 +448,16 @@ fn serialize_environment_context_prefers_environment_shell_when_present() { EnvironmentContextEnvironment { id: "local".to_string(), cwd: PathUri::from_abs_path(&local_cwd.abs()), - shell: "powershell".to_string(), + status: Ready { + shell: "powershell".to_string(), + }, }, EnvironmentContextEnvironment { id: "remote".to_string(), cwd: PathUri::from_abs_path(&remote_cwd.abs()), - shell: "cmd".to_string(), + status: Ready { + shell: "cmd".to_string(), + }, }, ], /*current_date*/ None, diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 6ce1c6148..eb6f09424 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -1,3 +1,4 @@ +use crate::context::EnvironmentContext; use crate::context_manager::normalize; use crate::event_mapping::has_non_contextual_dev_message_content; use crate::event_mapping::is_contextual_dev_message_content; @@ -48,6 +49,8 @@ pub(crate) struct ContextManager { /// also clear this when it trims a mixed initial-context developer bundle /// whose non-diff fragments no longer exist in the surviving history. reference_context_item: Option, + /// Environment state most recently appended to model-visible history. + environment_context_baseline: Option, } impl ContextManager { @@ -59,6 +62,7 @@ impl ContextManager { &None, &None, /*model_context_window*/ None, ), reference_context_item: None, + environment_context_baseline: None, } } @@ -78,6 +82,17 @@ impl ContextManager { self.reference_context_item.clone() } + pub(crate) fn update_environment_context_baseline( + &mut self, + context: &EnvironmentContext, + ) -> bool { + if self.environment_context_baseline.as_ref() == Some(context) { + return false; + } + self.environment_context_baseline = Some(context.clone()); + true + } + pub(crate) fn set_token_usage_full(&mut self, context_window: i64) { match &mut self.token_info { Some(info) => info.fill_to_context_window(context_window), @@ -163,12 +178,14 @@ impl ContextManager { // its corresponding counterpart to keep the invariants intact without // running a full normalization pass. normalize::remove_corresponding_for(&mut self.items, &removed); + self.environment_context_baseline = None; } } pub(crate) fn replace(&mut self, items: Vec) { self.items = items; self.history_version = self.history_version.saturating_add(1); + self.environment_context_baseline = None; } /// Replace image content in the last turn if it originated from a tool output. diff --git a/codex-rs/core/src/context_manager/history_tests.rs b/codex-rs/core/src/context_manager/history_tests.rs index 35a9656fe..bbf8d8059 100644 --- a/codex-rs/core/src/context_manager/history_tests.rs +++ b/codex-rs/core/src/context_manager/history_tests.rs @@ -1,4 +1,5 @@ use super::*; +use crate::context::EnvironmentContext; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use codex_protocol::AgentPath; @@ -73,6 +74,20 @@ fn create_history_with_items(items: Vec) -> ContextManager { h } +#[test] +fn environment_context_baseline_deduplicates_until_history_is_replaced() { + let context = + EnvironmentContext::from_turn_context_item(&reference_context_item(), "bash".to_string()); + let mut history = ContextManager::new(); + + assert!(history.update_environment_context_baseline(&context)); + assert!(!history.update_environment_context_baseline(&context)); + + history.replace(Vec::new()); + + assert!(history.update_environment_context_baseline(&context)); +} + fn user_msg(text: &str) -> ResponseItem { ResponseItem::Message { id: None, diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index f3c7a950d..42eb0c29e 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -218,6 +218,7 @@ mod rollout_budget; mod rollout_reconstruction; #[allow(clippy::module_inception)] pub(crate) mod session; +pub(crate) mod step_context; pub(crate) mod time_reminder; mod token_budget; pub(crate) mod turn; @@ -2772,6 +2773,35 @@ impl Session { self.send_raw_response_items(turn_context, items).await; } + pub(crate) async fn record_step_environment_context_if_changed( + &self, + turn_context: &TurnContext, + step_context: &step_context::StepContext, + ) { + if !turn_context.config.include_environment_context { + return; + } + + let shell = self.user_shell(); + let Some(environment_context) = + crate::context::EnvironmentContext::from_step_context(step_context, shell.as_ref()) + else { + return; + }; + let changed = { + let mut state = self.state.lock().await; + state + .history + .update_environment_context_baseline(&environment_context) + }; + if !changed { + return; + } + + let item = ContextualUserFragment::into(environment_context); + self.record_conversation_items(turn_context, &[item]).await; + } + pub(crate) async fn record_inter_agent_communication( &self, turn_context: &TurnContext, @@ -3438,6 +3468,22 @@ impl Session { .await, ); } + let initial_environment_context = if should_inject_full_context + && !context_items.is_empty() + && turn_context.config.include_environment_context + && turn_context + .config + .features + .enabled(Feature::DeferredExecutor) + { + let shell = self.user_shell(); + crate::context::EnvironmentContext::from_attached_environments( + &turn_context.environments.turn_environments, + shell.as_ref(), + ) + } else { + None + }; if !context_items.is_empty() { self.record_conversation_items(turn_context, &context_items) .await; @@ -3451,6 +3497,11 @@ impl Session { // context items. This keeps later runtime diffing aligned with the current turn state. let mut state = self.state.lock().await; state.set_reference_context_item(Some(turn_context_item)); + if let Some(environment_context) = initial_environment_context { + state + .history + .update_environment_context_baseline(&environment_context); + } } pub(crate) async fn update_token_usage_info( diff --git a/codex-rs/core/src/session/step_context.rs b/codex-rs/core/src/session/step_context.rs new file mode 100644 index 000000000..17243ced6 --- /dev/null +++ b/codex-rs/core/src/session/step_context.rs @@ -0,0 +1,7 @@ +use crate::environment_selection::TurnEnvironmentSnapshot; + +/// Request-scoped state that may change between model sampling requests. +#[derive(Debug)] +pub(crate) struct StepContext { + pub(crate) environments: TurnEnvironmentSnapshot, +} diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 041da729e..9b9c7cc66 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -42,6 +42,7 @@ use crate::responses_retry::handle_retryable_response_stream_error; use crate::session::PreviousTurnSettings; use crate::session::TurnInput; use crate::session::session::Session; +use crate::session::step_context::StepContext; use crate::session::turn_context::TurnContext; use crate::stream_events_utils::HandleOutputCtx; use crate::stream_events_utils::TurnItemContributorPolicy; @@ -241,6 +242,21 @@ pub(crate) async fn run_turn( ) .await?; + if turn_context + .config + .features + .enabled(Feature::DeferredExecutor) + { + let step_context = StepContext { + environments: sess.services.turn_environments.snapshot().await, + }; + sess.record_step_environment_context_if_changed( + turn_context.as_ref(), + &step_context, + ) + .await; + } + // Construct the input that we will send to the model. let sampling_request_input: Vec = async { sess.clone_history() diff --git a/codex-rs/core/tests/suite/remote_env.rs b/codex-rs/core/tests/suite/remote_env.rs index 67e5668f4..337ca8d40 100644 --- a/codex-rs/core/tests/suite/remote_env.rs +++ b/codex-rs/core/tests/suite/remote_env.rs @@ -26,6 +26,8 @@ use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::request_permissions::PermissionGrantScope; use codex_protocol::request_permissions::RequestPermissionProfile; use codex_protocol::request_permissions::RequestPermissionsResponse; +use codex_protocol::request_user_input::RequestUserInputAnswer; +use codex_protocol::request_user_input::RequestUserInputResponse; use codex_protocol::user_input::UserInput; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_path_uri::PathUri; @@ -49,9 +51,13 @@ use core_test_support::test_codex::local; use core_test_support::test_codex::test_codex; use core_test_support::test_codex::test_env; use core_test_support::wait_for_event; +use core_test_support::wait_for_event_match; +use futures::SinkExt; +use futures::StreamExt; use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; +use std::collections::HashMap; use std::fs; use std::path::PathBuf; use std::process::Command; @@ -59,6 +65,12 @@ use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; use tempfile::TempDir; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::time::timeout; +use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::accept_async; +use tokio_tungstenite::tungstenite::Message; async fn unified_exec_test(server: &wiremock::MockServer) -> Result { let mut builder = test_codex().with_config(|config| { config.use_experimental_unified_exec_tool = true; @@ -301,8 +313,7 @@ async fn explicit_remote_shell_runs_in_remote_cwd() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn deferred_executor_reaches_model_before_remote_environment_is_ready() -> Result<()> { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; +async fn deferred_executor_does_not_duplicate_initial_environment_context() -> Result<()> { let server = start_mock_server().await; let response_mock = mount_sse_once( &server, @@ -313,23 +324,216 @@ async fn deferred_executor_reaches_model_before_remote_environment_is_ready() -> ]), ) .await; + let mut builder = test_codex().with_config(|config| { + assert!(config.features.enable(Feature::DeferredExecutor).is_ok()); + }); + let test = builder.build(&server).await?; + + test.submit_turn("report the environment").await?; + + let user_context = response_mock.single_request().message_input_texts("user"); + assert_eq!( + user_context + .iter() + .filter(|text| text.contains("")) + .count(), + 1 + ); + + Ok(()) +} + +async fn read_exec_server_json(websocket: &mut WebSocketStream) -> Value { + loop { + match timeout(Duration::from_secs(5), websocket.next()) + .await + .expect("websocket read should not time out") + .expect("websocket should stay open") + .expect("websocket frame should read") + { + Message::Text(text) => { + return serde_json::from_str(text.as_ref()).expect("valid JSON-RPC message"); + } + Message::Binary(bytes) => { + return serde_json::from_slice(bytes.as_ref()).expect("valid JSON-RPC message"); + } + Message::Ping(_) | Message::Pong(_) => {} + other => panic!("expected JSON-RPC message, got {other:?}"), + } + } +} + +async fn serve_environment_info(listener: TcpListener) { + let (stream, _) = listener.accept().await.expect("connection"); + let mut websocket = accept_async(stream).await.expect("websocket handshake"); + + let initialize = read_exec_server_json(&mut websocket).await; + assert_eq!(initialize["method"], "initialize"); + websocket + .send(Message::Text( + json!({ + "id": initialize["id"], + "result": { "sessionId": "test-session" } + }) + .to_string() + .into(), + )) + .await + .expect("initialize response"); + let initialized = read_exec_server_json(&mut websocket).await; + assert_eq!(initialized["method"], "initialized"); + + let info = read_exec_server_json(&mut websocket).await; + assert_eq!(info["method"], "environment/info"); + websocket + .send(Message::Text( + json!({ + "id": info["id"], + "result": { "shell": { "name": "zsh", "path": "/bin/zsh" } } + }) + .to_string() + .into(), + )) + .await + .expect("environment info response"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn deferred_executor_updates_model_context_after_startup() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let server = start_mock_server().await; + let user_input_call_id = "wait-for-startup"; + let response_mock = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + user_input_call_id, + "request_user_input", + &json!({ + "questions": [{ + "id": "continue", + "header": "Continue", + "question": "Continue after startup?", + "options": [{ + "label": "Yes (Recommended)", + "description": "Continue the test." + }, { + "label": "No", + "description": "Stop the test." + }] + }] + }) + .to_string(), + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_function_call( + "update-plan", + "update_plan", + &json!({ + "explanation": "Continue after startup.", + "plan": [{"step": "Finish", "status": "completed"}] + }) + .to_string(), + ), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_response_created("resp-3"), + ev_assistant_message("msg-3", "done"), + ev_completed("resp-3"), + ]), + ], + ) + .await; let mut builder = test_codex() .with_exec_server_url(format!("ws://{}", listener.local_addr()?)) .with_config(|config| { assert!(config.features.enable(Feature::DeferredExecutor).is_ok()); + assert!( + config + .features + .enable(Feature::DefaultModeRequestUserInput) + .is_ok() + ); }); - - let test = tokio::time::timeout(Duration::from_secs(5), builder.build(&server)) + let test = timeout(Duration::from_secs(5), builder.build(&server)) .await .context("thread startup should not wait for the remote environment")??; - tokio::time::timeout( - Duration::from_secs(5), - test.submit_turn("respond before the environment is ready"), - ) - .await - .context("turn should reach the model before the remote environment is ready")??; - response_mock.single_request(); + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "wait for the environment".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + additional_context: Default::default(), + thread_settings: Default::default(), + }) + .await?; + let request = wait_for_event_match(&test.codex, |event| match event { + EventMsg::RequestUserInput(request) => Some(request.clone()), + _ => None, + }) + .await; + + serve_environment_info(listener).await; + test.codex + .submit(Op::UserInputAnswer { + id: request.turn_id, + response: RequestUserInputResponse { + answers: HashMap::from([( + "continue".to_string(), + RequestUserInputAnswer { + answers: vec!["Yes (Recommended)".to_string()], + }, + )]), + }, + }) + .await?; + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + let requests = response_mock.requests(); + assert_eq!(requests.len(), 3); + assert!( + requests[0] + .message_input_texts("user") + .iter() + .any(|text| text.contains("starting")) + ); + let ready_user_context = requests[1].message_input_texts("user"); + assert_eq!( + ready_user_context + .iter() + .filter(|text| text.contains("zsh")) + .count(), + 1 + ); + let final_user_context = requests[2].message_input_texts("user"); + assert_eq!( + final_user_context + .iter() + .filter(|text| text.contains("starting")) + .count(), + 1 + ); + assert_eq!( + final_user_context + .iter() + .filter(|text| text.contains("zsh")) + .count(), + 1 + ); + Ok(()) }