diff --git a/codex-rs/core/src/unified_exec/process_manager.rs b/codex-rs/core/src/unified_exec/process_manager.rs index 521489268..aaaa7076f 100644 --- a/codex-rs/core/src/unified_exec/process_manager.rs +++ b/codex-rs/core/src/unified_exec/process_manager.rs @@ -152,6 +152,7 @@ fn exec_server_params_for_request( env_policy, env, tty, + pipe_stdin: false, arg0: request.arg0.clone(), } } diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index 0047449d3..78b92e1a7 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -85,6 +85,7 @@ Request params: "PATH": "/usr/bin:/bin" }, "tty": true, + "pipeStdin": false, "arg0": null } ``` @@ -95,8 +96,8 @@ Field definitions: - `argv`: command vector. It must be non-empty. - `cwd`: absolute working directory used for the child process. - `env`: environment variables passed to the child process. -- `tty`: when `true`, spawn a PTY-backed interactive process; when `false`, - spawn a pipe-backed process with closed stdin. +- `tty`: when `true`, spawn a PTY-backed interactive process. +- `pipeStdin`: when `true`, keep non-PTY stdin writable via `process/write`. - `arg0`: optional argv0 override forwarded to `codex-utils-pty`. Response: @@ -111,7 +112,7 @@ Behavior notes: - Reusing an existing `processId` is rejected. - PTY-backed processes accept later writes through `process/write`. -- Pipe-backed processes are launched with stdin closed and reject writes. +- Non-PTY processes reject writes unless `pipeStdin` is `true`. - Output is streamed asynchronously via `process/output`. - Exit is reported asynchronously via `process/exited`. @@ -153,7 +154,7 @@ Response: ### `process/write` -Writes raw bytes to a running PTY-backed process stdin. +Writes raw bytes to a running process stdin. Request params: @@ -177,7 +178,7 @@ Response: Behavior notes: - Writes to an unknown `processId` are rejected. -- Writes to a non-PTY process are rejected because stdin is already closed. +- Writes to a non-PTY process are rejected unless it started with `pipeStdin`. ### `process/terminate` @@ -325,7 +326,7 @@ Initialize: Start a process: ```json -{"id":2,"method":"process/start","params":{"processId":"proc-1","argv":["bash","-lc","printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"],"cwd":"/tmp","env":{"PATH":"/usr/bin:/bin"},"tty":true,"arg0":null}} +{"id":2,"method":"process/start","params":{"processId":"proc-1","argv":["bash","-lc","printf 'ready\\n'; while IFS= read -r line; do printf 'echo:%s\\n' \"$line\"; done"],"cwd":"/tmp","env":{"PATH":"/usr/bin:/bin"},"tty":true,"pipeStdin":false,"arg0":null}} {"id":2,"result":{"processId":"proc-1"}} {"method":"process/output","params":{"processId":"proc-1","seq":1,"stream":"stdout","chunk":"cmVhZHkK"}} ``` diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 77ead87a8..afe072019 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -346,6 +346,7 @@ mod tests { env_policy: None, env: Default::default(), tty: false, + pipe_stdin: false, arg0: None, }) .await diff --git a/codex-rs/exec-server/src/local_process.rs b/codex-rs/exec-server/src/local_process.rs index bf38aa360..2608f892b 100644 --- a/codex-rs/exec-server/src/local_process.rs +++ b/codex-rs/exec-server/src/local_process.rs @@ -59,6 +59,7 @@ struct RetainedOutputChunk { struct RunningProcess { session: ExecCommandSession, tty: bool, + pipe_stdin: bool, output: VecDeque, retained_bytes: usize, next_seq: u64, @@ -165,6 +166,15 @@ impl LocalProcess { TerminalSize::default(), ) .await + } else if params.pipe_stdin { + codex_utils_pty::spawn_pipe_process( + program, + args, + params.cwd.as_path(), + &env, + ¶ms.arg0, + ) + .await } else { codex_utils_pty::spawn_pipe_process_no_stdin( program, @@ -195,6 +205,7 @@ impl LocalProcess { ProcessEntry::Running(Box::new(RunningProcess { session: spawned.session, tty: params.tty, + pipe_stdin: params.pipe_stdin, output: VecDeque::new(), retained_bytes: 0, next_seq: 1, @@ -339,7 +350,7 @@ impl LocalProcess { status: WriteStatus::Starting, }); }; - if !process.tty { + if !process.tty && !process.pipe_stdin { return Ok(WriteResponse { status: WriteStatus::StdinClosed, }); @@ -667,6 +678,7 @@ mod tests { env_policy: None, env, tty: false, + pipe_stdin: false, arg0: None, } } diff --git a/codex-rs/exec-server/src/protocol.rs b/codex-rs/exec-server/src/protocol.rs index 5d2934889..9f4234e64 100644 --- a/codex-rs/exec-server/src/protocol.rs +++ b/codex-rs/exec-server/src/protocol.rs @@ -69,6 +69,9 @@ pub struct ExecParams { pub env_policy: Option, pub env: HashMap, pub tty: bool, + /// Keep non-tty stdin writable through `process/write`. + #[serde(default)] + pub pipe_stdin: bool, pub arg0: Option, } diff --git a/codex-rs/exec-server/src/server/handler/tests.rs b/codex-rs/exec-server/src/server/handler/tests.rs index 321bf243a..7f2cdba62 100644 --- a/codex-rs/exec-server/src/server/handler/tests.rs +++ b/codex-rs/exec-server/src/server/handler/tests.rs @@ -30,6 +30,7 @@ fn exec_params_with_argv(process_id: &str, argv: Vec) -> ExecParams { env_policy: None, env: inherited_path_env(), tty: false, + pipe_stdin: false, arg0: None, } } diff --git a/codex-rs/exec-server/src/server/processor.rs b/codex-rs/exec-server/src/server/processor.rs index 87b970450..1153bc83e 100644 --- a/codex-rs/exec-server/src/server/processor.rs +++ b/codex-rs/exec-server/src/server/processor.rs @@ -393,6 +393,7 @@ mod tests { env_policy: None, env, tty: false, + pipe_stdin: false, arg0: None, } } diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index 72f029231..862e4e1c8 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -12,6 +12,7 @@ use codex_exec_server::ExecProcess; use codex_exec_server::ProcessId; use codex_exec_server::ReadResponse; use codex_exec_server::StartedExecProcess; +use codex_exec_server::WriteStatus; use pretty_assertions::assert_eq; use test_case::test_case; use tokio::sync::watch; @@ -54,6 +55,7 @@ async fn assert_exec_process_starts_and_exits(use_remote: bool) -> Result<()> { env_policy: /*env_policy*/ None, env: Default::default(), tty: false, + pipe_stdin: false, arg0: None, }) .await?; @@ -131,6 +133,7 @@ async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> { env_policy: /*env_policy*/ None, env: Default::default(), tty: false, + pipe_stdin: false, arg0: None, }) .await?; @@ -164,6 +167,7 @@ async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> { env_policy: /*env_policy*/ None, env: Default::default(), tty: true, + pipe_stdin: false, arg0: None, }) .await?; @@ -184,6 +188,73 @@ async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> { Ok(()) } +async fn assert_exec_process_write_then_read_without_tty(use_remote: bool) -> Result<()> { + let context = create_process_context(use_remote).await?; + let process_id = "proc-stdin-pipe".to_string(); + let session = context + .backend + .start(ExecParams { + process_id: process_id.clone().into(), + argv: vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "IFS= read line; printf 'from-stdin:%s\\n' \"$line\"".to_string(), + ], + cwd: std::env::current_dir()?, + env_policy: /*env_policy*/ None, + env: Default::default(), + tty: false, + pipe_stdin: true, + arg0: None, + }) + .await?; + assert_eq!(session.process.process_id().as_str(), process_id); + + tokio::time::sleep(Duration::from_millis(200)).await; + let write_response = session.process.write(b"hello\n".to_vec()).await?; + assert_eq!(write_response.status, WriteStatus::Accepted); + let StartedExecProcess { process } = session; + let wake_rx = process.subscribe_wake(); + let actual = collect_process_output_from_reads(process, wake_rx).await?; + + assert_eq!(actual, ("from-stdin:hello\n".to_string(), Some(0), true)); + Ok(()) +} + +async fn assert_exec_process_rejects_write_without_pipe_stdin(use_remote: bool) -> Result<()> { + let context = create_process_context(use_remote).await?; + let process_id = "proc-stdin-closed".to_string(); + let session = context + .backend + .start(ExecParams { + process_id: process_id.clone().into(), + argv: vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "sleep 0.3; if IFS= read -r line; then printf 'read:%s\\n' \"$line\"; else printf 'eof\\n'; fi".to_string(), + ], + cwd: std::env::current_dir()?, + env_policy: /*env_policy*/ None, + env: Default::default(), + tty: false, + pipe_stdin: false, + arg0: None, + }) + .await?; + assert_eq!(session.process.process_id().as_str(), process_id); + + let write_response = session.process.write(b"ignored\n".to_vec()).await?; + assert_eq!(write_response.status, WriteStatus::StdinClosed); + let StartedExecProcess { process } = session; + let wake_rx = process.subscribe_wake(); + let (output, exit_code, closed) = collect_process_output_from_reads(process, wake_rx).await?; + + assert_eq!(output, "eof\n"); + assert_eq!(exit_code, Some(0)); + assert!(closed); + Ok(()) +} + async fn assert_exec_process_preserves_queued_events_before_subscribe( use_remote: bool, ) -> Result<()> { @@ -201,6 +272,7 @@ async fn assert_exec_process_preserves_queued_events_before_subscribe( env_policy: /*env_policy*/ None, env: Default::default(), tty: false, + pipe_stdin: false, arg0: None, }) .await?; @@ -234,6 +306,7 @@ async fn remote_exec_process_reports_transport_disconnect() -> Result<()> { env_policy: /*env_policy*/ None, env: Default::default(), tty: false, + pipe_stdin: false, arg0: None, }) .await?; @@ -289,6 +362,24 @@ async fn exec_process_write_then_read(use_remote: bool) -> Result<()> { assert_exec_process_write_then_read(use_remote).await } +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// Serialize tests that launch a real exec-server process through the full CLI. +#[serial_test::serial(remote_exec_server)] +async fn exec_process_write_then_read_without_tty(use_remote: bool) -> Result<()> { + assert_exec_process_write_then_read_without_tty(use_remote).await +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// Serialize tests that launch a real exec-server process through the full CLI. +#[serial_test::serial(remote_exec_server)] +async fn exec_process_rejects_write_without_pipe_stdin(use_remote: bool) -> Result<()> { + assert_exec_process_rejects_write_without_pipe_stdin(use_remote).await +} + #[test_case(false ; "local")] #[test_case(true ; "remote")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/codex-rs/exec-server/tests/process.rs b/codex-rs/exec-server/tests/process.rs index cc96e6ef8..334e44922 100644 --- a/codex-rs/exec-server/tests/process.rs +++ b/codex-rs/exec-server/tests/process.rs @@ -10,6 +10,8 @@ use codex_exec_server::InitializeResponse; use codex_exec_server::ProcessId; use codex_exec_server::ReadResponse; use codex_exec_server::TerminateResponse; +use codex_exec_server::WriteResponse; +use codex_exec_server::WriteStatus; use common::exec_server::exec_server; use pretty_assertions::assert_eq; @@ -47,6 +49,7 @@ async fn exec_server_starts_process_over_websocket() -> anyhow::Result<()> { "cwd": std::env::current_dir()?, "env": {}, "tty": false, + "pipeStdin": false, "arg0": null }), ) @@ -75,6 +78,99 @@ async fn exec_server_starts_process_over_websocket() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_defaults_omitted_pipe_stdin_to_closed_stdin() -> anyhow::Result<()> { + let mut server = exec_server().await?; + let initialize_id = server + .send_request( + "initialize", + serde_json::to_value(InitializeParams { + client_name: "exec-server-test".to_string(), + resume_session_id: None, + })?, + ) + .await?; + let _ = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id + ) + }) + .await?; + + server + .send_notification("initialized", serde_json::json!({})) + .await?; + + let process_start_id = server + .send_request( + "process/start", + serde_json::json!({ + "processId": "proc-default-stdin", + "argv": [ + "/bin/sh", + "-c", + "sleep 0.3; if IFS= read -r line; then printf 'read:%s\\n' \"$line\"; else printf 'eof\\n'; fi" + ], + "cwd": std::env::current_dir()?, + "env": {}, + "tty": false, + "arg0": null + }), + ) + .await?; + let response = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &process_start_id + ) + }) + .await?; + let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else { + panic!("expected process/start response"); + }; + let process_start_response: ExecResponse = serde_json::from_value(result)?; + assert_eq!( + process_start_response, + ExecResponse { + process_id: ProcessId::from("proc-default-stdin") + } + ); + + let write_id = server + .send_request( + "process/write", + serde_json::json!({ + "processId": "proc-default-stdin", + "chunk": "aWdub3JlZAo=" + }), + ) + .await?; + let response = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &write_id + ) + }) + .await?; + let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else { + panic!("expected process/write response"); + }; + let write_response: WriteResponse = serde_json::from_value(result)?; + assert_eq!( + write_response, + WriteResponse { + status: WriteStatus::StdinClosed + } + ); + + server.shutdown().await?; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn exec_server_resumes_detached_session_without_killing_processes() -> anyhow::Result<()> { let mut server = exec_server().await?; @@ -113,6 +209,7 @@ async fn exec_server_resumes_detached_session_without_killing_processes() -> any "cwd": std::env::current_dir()?, "env": {}, "tty": false, + "pipeStdin": false, "arg0": null }), )