diff --git a/codex-rs/exec-server/src/server/processor.rs b/codex-rs/exec-server/src/server/processor.rs index b2daba7ca..f5c2ba760 100644 --- a/codex-rs/exec-server/src/server/processor.rs +++ b/codex-rs/exec-server/src/server/processor.rs @@ -196,6 +196,7 @@ mod tests { use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_utils_path_uri::PathUri; + use pretty_assertions::assert_eq; use serde::Serialize; use serde::de::DeserializeOwned; use tokio::io::AsyncBufReadExt; @@ -211,9 +212,11 @@ mod tests { use crate::ExecServerRuntimePaths; use crate::ProcessId; use crate::connection::JsonRpcConnection; + use crate::protocol::ENVIRONMENT_INFO_METHOD; use crate::protocol::EXEC_METHOD; use crate::protocol::EXEC_READ_METHOD; use crate::protocol::EXEC_TERMINATE_METHOD; + use crate::protocol::EnvironmentInfo; use crate::protocol::ExecParams; use crate::protocol::ExecResponse; use crate::protocol::INITIALIZE_METHOD; @@ -225,6 +228,38 @@ mod tests { use crate::protocol::TerminateResponse; use crate::server::session_registry::SessionRegistry; + #[tokio::test] + async fn connection_accepts_pipelined_scalar_requests() { + let registry = SessionRegistry::new(); + let (mut writer, mut lines, task) = spawn_test_connection(registry, "pipelined-scalar"); + + send_request( + &mut writer, + /*id*/ 1, + INITIALIZE_METHOD, + &InitializeParams { + client_name: "exec-server-test".to_string(), + resume_session_id: None, + }, + ) + .await; + let _: InitializeResponse = read_response(&mut lines, /*expected_id*/ 1).await; + send_notification(&mut writer, INITIALIZED_METHOD, &()).await; + + send_request(&mut writer, /*id*/ 2, ENVIRONMENT_INFO_METHOD, &()).await; + send_request(&mut writer, /*id*/ 3, ENVIRONMENT_INFO_METHOD, &()).await; + + let _: EnvironmentInfo = read_response(&mut lines, /*expected_id*/ 2).await; + let _: EnvironmentInfo = read_response(&mut lines, /*expected_id*/ 3).await; + + drop(writer); + drop(lines); + timeout(Duration::from_secs(1), task) + .await + .expect("processor should exit") + .expect("processor should join"); + } + #[tokio::test] async fn transport_disconnect_detaches_session_during_in_flight_read() { let registry = SessionRegistry::new();