From ef24ef127f9a0948a8a00381fcad8407906abba5 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 19 May 2026 08:39:45 -0700 Subject: [PATCH] [codex] Allow empty turn/start requests (#23409) ## Why `turn/start` already accepts an input array on the wire, including an empty array, but core treated empty input as a no-op before the turn could reach the model. App-server clients need to be able to start a real turn even when there is no new user message, for example to let the model proceed from existing thread context. ## What changed - Removed the `run_turn` early return that skipped empty-input turns when there was no pending input. - Kept empty active-turn steering rejected by moving the `steer_input` empty-input check until after core has determined whether there is an active regular turn. - Empty regular turns now refresh `previous_turn_settings` like other regular turns, so follow-up context injection state advances consistently. - Added an app-server v2 integration test proving `turn/start` with `input: []` emits started/completed notifications, sends one Responses request, and does not synthesize an empty user message. ## Validation - `cargo test -p codex-app-server --test all turn_start_with_empty_input_runs_model_request` --- .../app-server/tests/suite/v2/turn_start.rs | 101 ++++++++++++++++++ codex-rs/core/src/session/mod.rs | 8 +- codex-rs/core/src/session/turn.rs | 19 +--- 3 files changed, 110 insertions(+), 18 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 62f0725ca..156c0b208 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -200,6 +200,107 @@ async fn received_response_input_images(server: &wiremock::MockServer) -> Result Ok(input_images) } +#[tokio::test] +async fn turn_start_with_empty_input_runs_model_request() -> Result<()> { + let responses = vec![create_final_assistant_message_sse_response("Done")?]; + let server = create_mock_responses_server_sequence_unchecked(responses).await; + + let codex_home = TempDir::new()?; + create_config_toml( + codex_home.path(), + &server.uri(), + "never", + &BTreeMap::default(), + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + thread_source: Some(ThreadSource::User), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: Vec::new(), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + assert!(!turn.id.is_empty()); + + let started_notif: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/started"), + ) + .await??; + let started: TurnStartedNotification = + serde_json::from_value(started_notif.params.expect("params must be present"))?; + assert_eq!(started.thread_id, thread.id); + assert_eq!(started.turn.id, turn.id); + assert_eq!(started.turn.status, TurnStatus::InProgress); + + let completed_notif: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + let completed: TurnCompletedNotification = serde_json::from_value( + completed_notif + .params + .expect("turn/completed params must be present"), + )?; + assert_eq!(completed.thread_id, thread.id); + assert_eq!(completed.turn.id, turn.id); + assert_eq!(completed.turn.status, TurnStatus::Completed); + + let requests = server + .received_requests() + .await + .context("failed to fetch received requests")?; + let response_requests = requests + .iter() + .filter(|request| request.url.path().ends_with("/responses")) + .collect::>(); + assert_eq!(response_requests.len(), 1); + let body = response_requests[0] + .body_json::() + .context("request body should be JSON")?; + let input = body + .get("input") + .and_then(Value::as_array) + .context("request body should include input array")?; + assert!( + !input.iter().any(|item| { + item.get("type").and_then(Value::as_str) == Some("message") + && item.get("role").and_then(Value::as_str) == Some("user") + && item + .get("content") + .and_then(Value::as_array) + .is_some_and(Vec::is_empty) + }), + "empty turn/start should not synthesize an empty user message: {input:?}" + ); + + Ok(()) +} + #[tokio::test] async fn turn_start_sends_originator_header() -> Result<()> { let responses = vec![create_final_assistant_message_sse_response("Done")?]; diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index ddab7752f..9609a9d1b 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -3143,10 +3143,6 @@ impl Session { expected_turn_id: Option<&str>, responsesapi_client_metadata: Option>, ) -> Result { - if input.is_empty() { - return Err(SteerInputError::EmptyInput); - } - let mut active = self.active_turn.lock().await; let Some(active_turn) = active.as_mut() else { return Err(SteerInputError::NoActiveTurn(input)); @@ -3180,6 +3176,10 @@ impl Session { None => return Err(SteerInputError::NoActiveTurn(input)), } + if input.is_empty() { + return Err(SteerInputError::EmptyInput); + } + if let Some(responsesapi_client_metadata) = responsesapi_client_metadata && let Some((_, active_task)) = active_turn.tasks.first() { diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 7b1a59f61..c4ad5ebbc 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -143,10 +143,6 @@ pub(crate) async fn run_turn( prewarmed_client_session: Option, cancellation_token: CancellationToken, ) -> Option { - if input.is_empty() && !sess.input_queue.has_pending_input(&sess.active_turn).await { - return None; - } - let mut client_session = prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session()); // TODO(ccunningham): Pre-turn compaction runs before context updates and the @@ -210,16 +206,11 @@ pub(crate) async fn run_turn( sess.merge_connector_selection(explicitly_enabled_connectors.clone()) .await; record_additional_contexts(&sess, &turn_context, additional_contexts).await; - if !input.is_empty() { - // Track the previous-turn baseline from the regular user-turn path only so - // standalone tasks (compact/shell/review) cannot suppress future - // model/realtime injections. - sess.set_previous_turn_settings(Some(PreviousTurnSettings { - model: turn_context.model_info.slug.clone(), - realtime_active: Some(turn_context.realtime_active), - })) - .await; - } + sess.set_previous_turn_settings(Some(PreviousTurnSettings { + model: turn_context.model_info.slug.clone(), + realtime_active: Some(turn_context.realtime_active), + })) + .await; for response_item in injection_items { sess.record_conversation_items(&turn_context, std::slice::from_ref(&response_item)) .await;