[codex] Allow empty turn/start requests (#23409)

## Why

`turn/start` already accepts an input array on the wire, including an
empty array, but core treated empty input as a no-op before the turn
could reach the model. App-server clients need to be able to start a
real turn even when there is no new user message, for example to let the
model proceed from existing thread context.

## What changed

- Removed the `run_turn` early return that skipped empty-input turns
when there was no pending input.
- Kept empty active-turn steering rejected by moving the `steer_input`
empty-input check until after core has determined whether there is an
active regular turn.
- Empty regular turns now refresh `previous_turn_settings` like other
regular turns, so follow-up context injection state advances
consistently.
- Added an app-server v2 integration test proving `turn/start` with
`input: []` emits started/completed notifications, sends one Responses
request, and does not synthesize an empty user message.

## Validation

- `cargo test -p codex-app-server --test all
turn_start_with_empty_input_runs_model_request`
This commit is contained in:
pakrym-oai
2026-05-19 08:39:45 -07:00
committed by GitHub
Unverified
parent b3ae3de405
commit ef24ef127f
3 changed files with 110 additions and 18 deletions
@@ -200,6 +200,107 @@ async fn received_response_input_images(server: &wiremock::MockServer) -> Result
Ok(input_images)
}
#[tokio::test]
async fn turn_start_with_empty_input_runs_model_request() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::default(),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
thread_source: Some(ThreadSource::User),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: Vec::new(),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
assert!(!turn.id.is_empty());
let started_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
let started: TurnStartedNotification =
serde_json::from_value(started_notif.params.expect("params must be present"))?;
assert_eq!(started.thread_id, thread.id);
assert_eq!(started.turn.id, turn.id);
assert_eq!(started.turn.status, TurnStatus::InProgress);
let completed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let completed: TurnCompletedNotification = serde_json::from_value(
completed_notif
.params
.expect("turn/completed params must be present"),
)?;
assert_eq!(completed.thread_id, thread.id);
assert_eq!(completed.turn.id, turn.id);
assert_eq!(completed.turn.status, TurnStatus::Completed);
let requests = server
.received_requests()
.await
.context("failed to fetch received requests")?;
let response_requests = requests
.iter()
.filter(|request| request.url.path().ends_with("/responses"))
.collect::<Vec<_>>();
assert_eq!(response_requests.len(), 1);
let body = response_requests[0]
.body_json::<Value>()
.context("request body should be JSON")?;
let input = body
.get("input")
.and_then(Value::as_array)
.context("request body should include input array")?;
assert!(
!input.iter().any(|item| {
item.get("type").and_then(Value::as_str) == Some("message")
&& item.get("role").and_then(Value::as_str) == Some("user")
&& item
.get("content")
.and_then(Value::as_array)
.is_some_and(Vec::is_empty)
}),
"empty turn/start should not synthesize an empty user message: {input:?}"
);
Ok(())
}
#[tokio::test]
async fn turn_start_sends_originator_header() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
+4 -4
View File
@@ -3143,10 +3143,6 @@ impl Session {
expected_turn_id: Option<&str>,
responsesapi_client_metadata: Option<HashMap<String, String>>,
) -> Result<String, SteerInputError> {
if input.is_empty() {
return Err(SteerInputError::EmptyInput);
}
let mut active = self.active_turn.lock().await;
let Some(active_turn) = active.as_mut() else {
return Err(SteerInputError::NoActiveTurn(input));
@@ -3180,6 +3176,10 @@ impl Session {
None => return Err(SteerInputError::NoActiveTurn(input)),
}
if input.is_empty() {
return Err(SteerInputError::EmptyInput);
}
if let Some(responsesapi_client_metadata) = responsesapi_client_metadata
&& let Some((_, active_task)) = active_turn.tasks.first()
{
+5 -14
View File
@@ -143,10 +143,6 @@ pub(crate) async fn run_turn(
prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken,
) -> Option<String> {
if input.is_empty() && !sess.input_queue.has_pending_input(&sess.active_turn).await {
return None;
}
let mut client_session =
prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session());
// TODO(ccunningham): Pre-turn compaction runs before context updates and the
@@ -210,16 +206,11 @@ pub(crate) async fn run_turn(
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
.await;
record_additional_contexts(&sess, &turn_context, additional_contexts).await;
if !input.is_empty() {
// Track the previous-turn baseline from the regular user-turn path only so
// standalone tasks (compact/shell/review) cannot suppress future
// model/realtime injections.
sess.set_previous_turn_settings(Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(turn_context.realtime_active),
}))
.await;
}
sess.set_previous_turn_settings(Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(turn_context.realtime_active),
}))
.await;
for response_item in injection_items {
sess.record_conversation_items(&turn_context, std::slice::from_ref(&response_item))
.await;