diff --git a/codex-rs/core/src/unified_exec/process.rs b/codex-rs/core/src/unified_exec/process.rs index ba5352ff6..9fc81a6ba 100644 --- a/codex-rs/core/src/unified_exec/process.rs +++ b/codex-rs/core/src/unified_exec/process.rs @@ -5,6 +5,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use tokio::sync::Mutex; use tokio::sync::Notify; +use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::oneshot::error::TryRecvError; use tokio::task::JoinHandle; @@ -47,6 +48,7 @@ pub(crate) struct OutputHandles { #[derive(Debug)] pub(crate) struct UnifiedExecProcess { process_handle: ExecCommandSession, + output_rx: broadcast::Receiver>, output_buffer: OutputBuffer, output_notify: Arc, output_closed: Arc, @@ -72,6 +74,7 @@ impl UnifiedExecProcess { let cancellation_token = CancellationToken::new(); let output_drained = Arc::new(Notify::new()); let mut receiver = initial_output_rx; + let output_rx = receiver.resubscribe(); let buffer_clone = Arc::clone(&output_buffer); let notify_clone = Arc::clone(&output_notify); let output_closed_clone = Arc::clone(&output_closed); @@ -97,6 +100,7 @@ impl UnifiedExecProcess { Self { process_handle, + output_rx, output_buffer, output_notify, output_closed, @@ -124,7 +128,7 @@ impl UnifiedExecProcess { } pub(super) fn output_receiver(&self) -> tokio::sync::broadcast::Receiver> { - self.process_handle.output_receiver() + self.output_rx.resubscribe() } pub(super) fn cancellation_token(&self) -> CancellationToken { @@ -214,9 +218,11 @@ impl UnifiedExecProcess { ) -> Result { let SpawnedPty { session: process_handle, - output_rx, + stdout_rx, + stderr_rx, mut exit_rx, } = spawned; + let output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx); let managed = Self::new(process_handle, output_rx, sandbox_type, spawn_lifecycle); let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed)); diff --git a/codex-rs/core/src/unified_exec/process_manager.rs b/codex-rs/core/src/unified_exec/process_manager.rs index 4ba125009..770379afa 100644 --- a/codex-rs/core/src/unified_exec/process_manager.rs +++ b/codex-rs/core/src/unified_exec/process_manager.rs @@ -543,6 +543,7 @@ impl UnifiedExecProcessManager { env.cwd.as_path(), &env.env, &env.arg0, + codex_utils_pty::TerminalSize::default(), ) .await } else { diff --git a/codex-rs/tui/tests/suite/model_availability_nux.rs b/codex-rs/tui/tests/suite/model_availability_nux.rs index a76ef02bb..512db9797 100644 --- a/codex-rs/tui/tests/suite/model_availability_nux.rs +++ b/codex-rs/tui/tests/suite/model_availability_nux.rs @@ -124,13 +124,20 @@ trust_level = "trusted" &repo_root, &env, &None, + codex_utils_pty::TerminalSize::default(), ) .await?; let mut output = Vec::new(); - let mut output_rx = spawned.output_rx; - let mut exit_rx = spawned.exit_rx; - let writer_tx = spawned.session.writer_sender(); + let codex_utils_pty::SpawnedProcess { + session, + stdout_rx, + stderr_rx, + exit_rx, + } = spawned; + let mut output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx); + let mut exit_rx = exit_rx; + let writer_tx = session.writer_sender(); let interrupt_writer = writer_tx.clone(); let interrupt_task = tokio::spawn(async move { sleep(Duration::from_secs(2)).await; @@ -165,7 +172,7 @@ trust_level = "trusted" Ok(Ok(code)) => code, Ok(Err(err)) => return Err(err.into()), Err(_) => { - spawned.session.terminate(); + session.terminate(); anyhow::bail!("timed out waiting for codex resume to exit"); } }; diff --git a/codex-rs/tui/tests/suite/no_panic_on_startup.rs b/codex-rs/tui/tests/suite/no_panic_on_startup.rs index 3984e771d..6b9a36969 100644 --- a/codex-rs/tui/tests/suite/no_panic_on_startup.rs +++ b/codex-rs/tui/tests/suite/no_panic_on_startup.rs @@ -71,12 +71,19 @@ async fn run_codex_cli( cwd.as_ref(), &env, &None, + codex_utils_pty::TerminalSize::default(), ) .await?; let mut output = Vec::new(); - let mut output_rx = spawned.output_rx; - let mut exit_rx = spawned.exit_rx; - let writer_tx = spawned.session.writer_sender(); + let codex_utils_pty::SpawnedProcess { + session, + stdout_rx, + stderr_rx, + exit_rx, + } = spawned; + let mut output_rx = codex_utils_pty::combine_output_receivers(stdout_rx, stderr_rx); + let mut exit_rx = exit_rx; + let writer_tx = session.writer_sender(); let exit_code_result = timeout(Duration::from_secs(10), async { // Read PTY output until the process exits while replying to cursor // position queries so the TUI can initialize without a real terminal. @@ -103,7 +110,7 @@ async fn run_codex_cli( Ok(Ok(code)) => code, Ok(Err(err)) => return Err(err.into()), Err(_) => { - spawned.session.terminate(); + session.terminate(); anyhow::bail!("timed out waiting for codex CLI to exit"); } }; diff --git a/codex-rs/utils/pty/README.md b/codex-rs/utils/pty/README.md index d0f77268a..e70d7bc6a 100644 --- a/codex-rs/utils/pty/README.md +++ b/codex-rs/utils/pty/README.md @@ -4,22 +4,27 @@ Lightweight helpers for spawning interactive processes either under a PTY (pseud ## API surface -- `spawn_pty_process(program, args, cwd, env, arg0)` → `SpawnedProcess` +- `spawn_pty_process(program, args, cwd, env, arg0, size)` → `SpawnedProcess` - `spawn_pipe_process(program, args, cwd, env, arg0)` → `SpawnedProcess` - `spawn_pipe_process_no_stdin(program, args, cwd, env, arg0)` → `SpawnedProcess` +- `combine_output_receivers(stdout_rx, stderr_rx)` → `broadcast::Receiver>` - `conpty_supported()` → `bool` (Windows only; always true elsewhere) +- `TerminalSize { rows, cols }` selects PTY dimensions in character cells. - `ProcessHandle` exposes: - `writer_sender()` → `mpsc::Sender>` (stdin) - - `output_receiver()` → `broadcast::Receiver>` (stdout/stderr merged) + - `resize(TerminalSize)` + - `close_stdin()` - `has_exited()`, `exit_code()`, `terminate()` -- `SpawnedProcess` bundles `handle`, `output_rx`, and `exit_rx` (oneshot exit code). +- `SpawnedProcess` bundles `session`, `stdout_rx`, `stderr_rx`, and `exit_rx` (oneshot exit code). ## Usage examples ```rust use std::collections::HashMap; use std::path::Path; +use codex_utils_pty::combine_output_receivers; use codex_utils_pty::spawn_pty_process; +use codex_utils_pty::TerminalSize; # tokio_test::block_on(async { let env_map: HashMap = std::env::vars().collect(); @@ -29,13 +34,14 @@ let spawned = spawn_pty_process( Path::new("."), &env_map, &None, + TerminalSize::default(), ).await?; let writer = spawned.session.writer_sender(); writer.send(b"exit\n".to_vec()).await?; // Collect output until the process exits. -let mut output_rx = spawned.output_rx; +let mut output_rx = combine_output_receivers(spawned.stdout_rx, spawned.stderr_rx); let mut collected = Vec::new(); while let Ok(chunk) = output_rx.try_recv() { collected.extend_from_slice(&chunk); diff --git a/codex-rs/utils/pty/src/lib.rs b/codex-rs/utils/pty/src/lib.rs index 590770e2a..0eba23e3f 100644 --- a/codex-rs/utils/pty/src/lib.rs +++ b/codex-rs/utils/pty/src/lib.rs @@ -7,14 +7,20 @@ mod tests; #[cfg(windows)] mod win; +pub const DEFAULT_OUTPUT_BYTES_CAP: usize = 1024 * 1024; + /// Spawn a non-interactive process using regular pipes for stdin/stdout/stderr. pub use pipe::spawn_process as spawn_pipe_process; /// Spawn a non-interactive process using regular pipes, but close stdin immediately. pub use pipe::spawn_process_no_stdin as spawn_pipe_process_no_stdin; +/// Combine stdout/stderr receivers into a single broadcast receiver. +pub use process::combine_output_receivers; /// Handle for interacting with a spawned process (PTY or pipe). pub use process::ProcessHandle; -/// Bundle of process handles plus output and exit receivers returned by spawn helpers. +/// Bundle of process handles plus split output and exit receivers returned by spawn helpers. pub use process::SpawnedProcess; +/// Terminal size in character cells used for PTY spawn and resize operations. +pub use process::TerminalSize; /// Backwards-compatible alias for ProcessHandle. pub type ExecCommandSession = ProcessHandle; /// Backwards-compatible alias for SpawnedProcess. diff --git a/codex-rs/utils/pty/src/pipe.rs b/codex-rs/utils/pty/src/pipe.rs index d77c11383..f4b6d68a4 100644 --- a/codex-rs/utils/pty/src/pipe.rs +++ b/codex-rs/utils/pty/src/pipe.rs @@ -13,7 +13,6 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; use tokio::process::Command; -use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::task::JoinHandle; @@ -73,7 +72,7 @@ fn kill_process(pid: u32) -> io::Result<()> { } } -async fn read_output_stream(mut reader: R, output_tx: broadcast::Sender>) +async fn read_output_stream(mut reader: R, output_tx: mpsc::Sender>) where R: AsyncRead + Unpin, { @@ -82,7 +81,7 @@ where match reader.read(&mut buf).await { Ok(0) => break, Ok(n) => { - let _ = output_tx.send(buf[..n].to_vec()); + let _ = output_tx.send(buf[..n].to_vec()).await; } Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, Err(_) => break, @@ -157,9 +156,8 @@ async fn spawn_process_with_stdin_mode( let stderr = child.stderr.take(); let (writer_tx, mut writer_rx) = mpsc::channel::>(128); - let (output_tx, _) = broadcast::channel::>(256); - let initial_output_rx = output_tx.subscribe(); - + let (stdout_tx, stdout_rx) = mpsc::channel::>(128); + let (stderr_tx, stderr_rx) = mpsc::channel::>(128); let writer_handle = if let Some(stdin) = stdin { let writer = Arc::new(tokio::sync::Mutex::new(stdin)); tokio::spawn(async move { @@ -175,15 +173,15 @@ async fn spawn_process_with_stdin_mode( }; let stdout_handle = stdout.map(|stdout| { - let output_tx = output_tx.clone(); + let stdout_tx = stdout_tx.clone(); tokio::spawn(async move { - read_output_stream(BufReader::new(stdout), output_tx).await; + read_output_stream(BufReader::new(stdout), stdout_tx).await; }) }); let stderr_handle = stderr.map(|stderr| { - let output_tx = output_tx.clone(); + let stderr_tx = stderr_tx.clone(); tokio::spawn(async move { - read_output_stream(BufReader::new(stderr), output_tx).await; + read_output_stream(BufReader::new(stderr), stderr_tx).await; }) }); let mut reader_abort_handles = Vec::new(); @@ -219,10 +217,8 @@ async fn spawn_process_with_stdin_mode( let _ = exit_tx.send(code); }); - let (handle, output_rx) = ProcessHandle::new( + let handle = ProcessHandle::new( writer_tx, - output_tx, - initial_output_rx, Box::new(PipeChildTerminator { #[cfg(windows)] pid, @@ -240,12 +236,13 @@ async fn spawn_process_with_stdin_mode( Ok(SpawnedProcess { session: handle, - output_rx, + stdout_rx, + stderr_rx, exit_rx, }) } -/// Spawn a process using regular pipes (no PTY), returning handles for stdin, output, and exit. +/// Spawn a process using regular pipes (no PTY), returning handles for stdin, split output, and exit. pub async fn spawn_process( program: &str, args: &[String], diff --git a/codex-rs/utils/pty/src/process.rs b/codex-rs/utils/pty/src/process.rs index 5c487fd38..d7a0addc3 100644 --- a/codex-rs/utils/pty/src/process.rs +++ b/codex-rs/utils/pty/src/process.rs @@ -4,7 +4,9 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::sync::Mutex as StdMutex; +use anyhow::anyhow; use portable_pty::MasterPty; +use portable_pty::PtySize; use portable_pty::SlavePty; use tokio::sync::broadcast; use tokio::sync::mpsc; @@ -16,6 +18,29 @@ pub(crate) trait ChildTerminator: Send + Sync { fn kill(&mut self) -> io::Result<()>; } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct TerminalSize { + pub rows: u16, + pub cols: u16, +} + +impl Default for TerminalSize { + fn default() -> Self { + Self { rows: 24, cols: 80 } + } +} + +impl From for PtySize { + fn from(value: TerminalSize) -> Self { + Self { + rows: value.rows, + cols: value.cols, + pixel_width: 0, + pixel_height: 0, + } + } +} + pub struct PtyHandles { pub _slave: Option>, pub _master: Box, @@ -29,8 +54,7 @@ impl fmt::Debug for PtyHandles { /// Handle for driving an interactive process (PTY or pipe). pub struct ProcessHandle { - writer_tx: mpsc::Sender>, - output_tx: broadcast::Sender>, + writer_tx: StdMutex>>>, killer: StdMutex>>, reader_handle: StdMutex>>, reader_abort_handles: StdMutex>, @@ -53,8 +77,6 @@ impl ProcessHandle { #[allow(clippy::too_many_arguments)] pub(crate) fn new( writer_tx: mpsc::Sender>, - output_tx: broadcast::Sender>, - initial_output_rx: broadcast::Receiver>, killer: Box, reader_handle: JoinHandle<()>, reader_abort_handles: Vec, @@ -63,32 +85,31 @@ impl ProcessHandle { exit_status: Arc, exit_code: Arc>>, pty_handles: Option, - ) -> (Self, broadcast::Receiver>) { - ( - Self { - writer_tx, - output_tx, - killer: StdMutex::new(Some(killer)), - reader_handle: StdMutex::new(Some(reader_handle)), - reader_abort_handles: StdMutex::new(reader_abort_handles), - writer_handle: StdMutex::new(Some(writer_handle)), - wait_handle: StdMutex::new(Some(wait_handle)), - exit_status, - exit_code, - _pty_handles: StdMutex::new(pty_handles), - }, - initial_output_rx, - ) + ) -> Self { + Self { + writer_tx: StdMutex::new(Some(writer_tx)), + killer: StdMutex::new(Some(killer)), + reader_handle: StdMutex::new(Some(reader_handle)), + reader_abort_handles: StdMutex::new(reader_abort_handles), + writer_handle: StdMutex::new(Some(writer_handle)), + wait_handle: StdMutex::new(Some(wait_handle)), + exit_status, + exit_code, + _pty_handles: StdMutex::new(pty_handles), + } } /// Returns a channel sender for writing raw bytes to the child stdin. pub fn writer_sender(&self) -> mpsc::Sender> { - self.writer_tx.clone() - } + if let Ok(writer_tx) = self.writer_tx.lock() { + if let Some(writer_tx) = writer_tx.as_ref() { + return writer_tx.clone(); + } + } - /// Returns a broadcast receiver that yields stdout/stderr chunks. - pub fn output_receiver(&self) -> broadcast::Receiver> { - self.output_tx.subscribe() + let (writer_tx, writer_rx) = mpsc::channel(1); + drop(writer_rx); + writer_tx } /// True if the child process has exited. @@ -101,13 +122,38 @@ impl ProcessHandle { self.exit_code.lock().ok().and_then(|guard| *guard) } - /// Attempts to kill the child and abort helper tasks. - pub fn terminate(&self) { + /// Resize the PTY in character cells. + pub fn resize(&self, size: TerminalSize) -> anyhow::Result<()> { + let handles = self + ._pty_handles + .lock() + .map_err(|_| anyhow!("failed to lock PTY handles"))?; + let handles = handles + .as_ref() + .ok_or_else(|| anyhow!("process is not attached to a PTY"))?; + handles._master.resize(size.into()) + } + + /// Close the child's stdin channel. + pub fn close_stdin(&self) { + if let Ok(mut writer_tx) = self.writer_tx.lock() { + writer_tx.take(); + } + } + + /// Attempts to kill the child while leaving the reader/writer tasks alive + /// so callers can still drain output until EOF. + pub fn request_terminate(&self) { if let Ok(mut killer_opt) = self.killer.lock() { if let Some(mut killer) = killer_opt.take() { let _ = killer.kill(); } } + } + + /// Attempts to kill the child and abort helper tasks. + pub fn terminate(&self) { + self.request_terminate(); if let Ok(mut h) = self.reader_handle.lock() { if let Some(handle) = h.take() { @@ -138,10 +184,46 @@ impl Drop for ProcessHandle { } } -/// Return value from spawn helpers (PTY or pipe). +/// Combine split stdout/stderr receivers into a single broadcast receiver. +pub fn combine_output_receivers( + mut stdout_rx: mpsc::Receiver>, + mut stderr_rx: mpsc::Receiver>, +) -> broadcast::Receiver> { + let (combined_tx, combined_rx) = broadcast::channel(256); + tokio::spawn(async move { + let mut stdout_open = true; + let mut stderr_open = true; + + loop { + tokio::select! { + stdout = stdout_rx.recv(), if stdout_open => match stdout { + Some(chunk) => { + let _ = combined_tx.send(chunk); + } + None => { + stdout_open = false; + } + }, + stderr = stderr_rx.recv(), if stderr_open => match stderr { + Some(chunk) => { + let _ = combined_tx.send(chunk); + } + None => { + stderr_open = false; + } + }, + else => break, + } + } + }); + combined_rx +} + +/// Return value from PTY or pipe spawn helpers. #[derive(Debug)] pub struct SpawnedProcess { pub session: ProcessHandle, - pub output_rx: broadcast::Receiver>, + pub stdout_rx: mpsc::Receiver>, + pub stderr_rx: mpsc::Receiver>, pub exit_rx: oneshot::Receiver, } diff --git a/codex-rs/utils/pty/src/pty.rs b/codex-rs/utils/pty/src/pty.rs index 367cb5b1d..63ea838d8 100644 --- a/codex-rs/utils/pty/src/pty.rs +++ b/codex-rs/utils/pty/src/pty.rs @@ -10,8 +10,6 @@ use anyhow::Result; #[cfg(not(windows))] use portable_pty::native_pty_system; use portable_pty::CommandBuilder; -use portable_pty::PtySize; -use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::task::JoinHandle; @@ -20,6 +18,7 @@ use crate::process::ChildTerminator; use crate::process::ProcessHandle; use crate::process::PtyHandles; use crate::process::SpawnedProcess; +use crate::process::TerminalSize; /// Returns true when ConPTY support is available (Windows only). #[cfg(windows)] @@ -72,25 +71,21 @@ fn platform_native_pty_system() -> Box { } } -/// Spawn a process attached to a PTY, returning handles for stdin, output, and exit. +/// Spawn a process attached to a PTY, returning handles for stdin, split output, and exit. pub async fn spawn_process( program: &str, args: &[String], cwd: &Path, env: &HashMap, arg0: &Option, + size: TerminalSize, ) -> Result { if program.is_empty() { anyhow::bail!("missing program for PTY spawn"); } let pty_system = platform_native_pty_system(); - let pair = pty_system.openpty(PtySize { - rows: 24, - cols: 80, - pixel_width: 0, - pixel_height: 0, - })?; + let pair = pty_system.openpty(size.into())?; let mut command_builder = CommandBuilder::new(arg0.as_ref().unwrap_or(&program.to_string())); command_builder.cwd(cwd); @@ -111,18 +106,16 @@ pub async fn spawn_process( let killer = child.clone_killer(); let (writer_tx, mut writer_rx) = mpsc::channel::>(128); - let (output_tx, _) = broadcast::channel::>(256); - let initial_output_rx = output_tx.subscribe(); - + let (stdout_tx, stdout_rx) = mpsc::channel::>(128); + let (_stderr_tx, stderr_rx) = mpsc::channel::>(1); let mut reader = pair.master.try_clone_reader()?; - let output_tx_clone = output_tx.clone(); let reader_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || { let mut buf = [0u8; 8_192]; loop { match reader.read(&mut buf) { Ok(0) => break, Ok(n) => { - let _ = output_tx_clone.send(buf[..n].to_vec()); + let _ = stdout_tx.blocking_send(buf[..n].to_vec()); } Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, Err(ref e) if e.kind() == ErrorKind::WouldBlock => { @@ -174,10 +167,8 @@ pub async fn spawn_process( _master: pair.master, }; - let (handle, output_rx) = ProcessHandle::new( + let handle = ProcessHandle::new( writer_tx, - output_tx, - initial_output_rx, Box::new(PtyChildTerminator { killer, #[cfg(unix)] @@ -194,7 +185,8 @@ pub async fn spawn_process( Ok(SpawnedProcess { session: handle, - output_rx, + stdout_rx, + stderr_rx, exit_rx, }) } diff --git a/codex-rs/utils/pty/src/tests.rs b/codex-rs/utils/pty/src/tests.rs index ce50adddc..528bdf989 100644 --- a/codex-rs/utils/pty/src/tests.rs +++ b/codex-rs/utils/pty/src/tests.rs @@ -3,8 +3,12 @@ use std::path::Path; use pretty_assertions::assert_eq; +use crate::combine_output_receivers; use crate::spawn_pipe_process; +use crate::spawn_pipe_process_no_stdin; use crate::spawn_pty_process; +use crate::SpawnedProcess; +use crate::TerminalSize; fn find_python() -> Option { for candidate in ["python3", "python"] { @@ -51,6 +55,38 @@ fn echo_sleep_command(marker: &str) -> String { } } +fn split_stdout_stderr_command() -> String { + "printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string() +} + +async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver>) -> Vec { + let mut collected = Vec::new(); + while let Some(chunk) = output_rx.recv().await { + collected.extend_from_slice(&chunk); + } + collected +} + +fn combine_spawned_output( + spawned: SpawnedProcess, +) -> ( + crate::ProcessHandle, + tokio::sync::broadcast::Receiver>, + tokio::sync::oneshot::Receiver, +) { + let SpawnedProcess { + session, + stdout_rx, + stderr_rx, + exit_rx, + } = spawned; + ( + session, + combine_output_receivers(stdout_rx, stderr_rx), + exit_rx, + ) +} + async fn collect_output_until_exit( mut output_rx: tokio::sync::broadcast::Receiver>, exit_rx: tokio::sync::oneshot::Receiver, @@ -219,9 +255,17 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> { }; let env_map: HashMap = std::env::vars().collect(); - let spawned = spawn_pty_process(&python, &[], Path::new("."), &env_map, &None).await?; - let writer = spawned.session.writer_sender(); - let mut output_rx = spawned.output_rx; + let spawned = spawn_pty_process( + &python, + &[], + Path::new("."), + &env_map, + &None, + TerminalSize::default(), + ) + .await?; + let (session, mut output_rx, exit_rx) = combine_spawned_output(spawned); + let writer = session.writer_sender(); let newline = if cfg!(windows) { "\r\n" } else { "\n" }; let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 }; let mut output = @@ -232,8 +276,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> { writer.send(format!("exit(){newline}").into_bytes()).await?; let timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 }; - let (remaining_output, code) = - collect_output_until_exit(output_rx, spawned.exit_rx, timeout_ms).await; + let (remaining_output, code) = collect_output_until_exit(output_rx, exit_rx, timeout_ms).await; output.extend_from_slice(&remaining_output); let text = String::from_utf8_lossy(&output); @@ -260,10 +303,11 @@ async fn pipe_process_round_trips_stdin() -> anyhow::Result<()> { ]; let env_map: HashMap = std::env::vars().collect(); let spawned = spawn_pipe_process(&python, &args, Path::new("."), &env_map, &None).await?; - let writer = spawned.session.writer_sender(); + let (session, output_rx, exit_rx) = combine_spawned_output(spawned); + let writer = session.writer_sender(); writer.send(b"roundtrip\n".to_vec()).await?; - let (output, code) = collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 5_000).await; + let (output, code) = collect_output_until_exit(output_rx, exit_rx, 5_000).await; let text = String::from_utf8_lossy(&output); assert!( @@ -288,7 +332,7 @@ async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> { let (program, args) = shell_command(script); let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?; - let mut output_rx = spawned.output_rx; + let (_session, mut output_rx, exit_rx) = combine_spawned_output(spawned); let pid_bytes = tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv()).await??; let pid_text = String::from_utf8_lossy(&pid_bytes); @@ -309,7 +353,7 @@ async fn pipe_process_detaches_from_parent_session() -> anyhow::Result<()> { "expected child to be detached from parent session" ); - let exit_code = spawned.exit_rx.await.unwrap_or(-1); + let exit_code = exit_rx.await.unwrap_or(-1); assert_eq!( exit_code, 0, "expected detached pipe process to exit cleanly" @@ -327,13 +371,23 @@ async fn pipe_and_pty_share_interface() -> anyhow::Result<()> { let pipe = spawn_pipe_process(&pipe_program, &pipe_args, Path::new("."), &env_map, &None).await?; - let pty = spawn_pty_process(&pty_program, &pty_args, Path::new("."), &env_map, &None).await?; + let pty = spawn_pty_process( + &pty_program, + &pty_args, + Path::new("."), + &env_map, + &None, + TerminalSize::default(), + ) + .await?; + let (_pipe_session, pipe_output_rx, pipe_exit_rx) = combine_spawned_output(pipe); + let (_pty_session, pty_output_rx, pty_exit_rx) = combine_spawned_output(pty); let timeout_ms = if cfg!(windows) { 10_000 } else { 3_000 }; let (pipe_out, pipe_code) = - collect_output_until_exit(pipe.output_rx, pipe.exit_rx, timeout_ms).await; + collect_output_until_exit(pipe_output_rx, pipe_exit_rx, timeout_ms).await; let (pty_out, pty_code) = - collect_output_until_exit(pty.output_rx, pty.exit_rx, timeout_ms).await; + collect_output_until_exit(pty_output_rx, pty_exit_rx, timeout_ms).await; assert_eq!(pipe_code, 0); assert_eq!(pty_code, 0); @@ -360,9 +414,9 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> { let args = vec!["-c".to_string(), script.to_string()]; let env_map: HashMap = std::env::vars().collect(); let spawned = spawn_pipe_process(&python, &args, Path::new("."), &env_map, &None).await?; + let (_session, output_rx, exit_rx) = combine_spawned_output(spawned); - let (output, code) = - collect_output_until_exit(spawned.output_rx, spawned.exit_rx, 10_000).await; + let (output, code) = collect_output_until_exit(output_rx, exit_rx, 10_000).await; assert_eq!(code, 0, "expected python to exit cleanly"); assert!(!output.is_empty(), "expected stderr output to be drained"); @@ -370,6 +424,53 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> { + let env_map: HashMap = std::env::vars().collect(); + let (program, args) = if cfg!(windows) { + let Some(python) = find_python() else { + eprintln!("python not found; skipping pipe_process_can_expose_split_stdout_and_stderr"); + return Ok(()); + }; + ( + python, + vec![ + "-c".to_string(), + "import sys; sys.stdout.buffer.write(b'split-out\\n'); sys.stdout.buffer.flush(); sys.stderr.buffer.write(b'split-err\\n'); sys.stderr.buffer.flush()".to_string(), + ], + ) + } else { + shell_command(&split_stdout_stderr_command()) + }; + let spawned = + spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?; + let SpawnedProcess { + session: _session, + stdout_rx, + stderr_rx, + exit_rx, + } = spawned; + + let stdout_task = tokio::spawn(async move { collect_split_output(stdout_rx).await }); + let stderr_task = tokio::spawn(async move { collect_split_output(stderr_rx).await }); + let code = tokio::time::timeout(tokio::time::Duration::from_secs(2), exit_rx) + .await + .map_err(|_| anyhow::anyhow!("timed out waiting for split process exit"))? + .unwrap_or(-1); + let stdout = tokio::time::timeout(tokio::time::Duration::from_secs(2), stdout_task) + .await + .map_err(|_| anyhow::anyhow!("timed out waiting to drain split stdout"))??; + let stderr = tokio::time::timeout(tokio::time::Duration::from_secs(2), stderr_task) + .await + .map_err(|_| anyhow::anyhow!("timed out waiting to drain split stderr"))??; + + assert_eq!(stdout, b"split-out\n".to_vec()); + assert_eq!(stderr, b"split-err\n".to_vec()); + assert_eq!(code, 0); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pipe_terminate_aborts_detached_readers() -> anyhow::Result<()> { if !setsid_available() { @@ -381,17 +482,15 @@ async fn pipe_terminate_aborts_detached_readers() -> anyhow::Result<()> { let script = "setsid sh -c 'i=0; while [ $i -lt 200 ]; do echo tick; sleep 0.01; i=$((i+1)); done' &"; let (program, args) = shell_command(script); - let mut spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?; + let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?; + let (session, mut output_rx, _exit_rx) = combine_spawned_output(spawned); - let _ = tokio::time::timeout( - tokio::time::Duration::from_millis(500), - spawned.output_rx.recv(), - ) - .await - .map_err(|_| anyhow::anyhow!("expected detached output before terminate"))??; + let _ = tokio::time::timeout(tokio::time::Duration::from_millis(500), output_rx.recv()) + .await + .map_err(|_| anyhow::anyhow!("expected detached output before terminate"))??; - spawned.session.terminate(); - let mut post_rx = spawned.session.output_receiver(); + session.terminate(); + let mut post_rx = output_rx.resubscribe(); let post_terminate = tokio::time::timeout(tokio::time::Duration::from_millis(200), post_rx.recv()).await; @@ -416,12 +515,21 @@ async fn pty_terminate_kills_background_children_in_same_process_group() -> anyh let marker = "__codex_bg_pid:"; let script = format!("sleep 1000 & bg=$!; echo {marker}$bg; wait"); let (program, args) = shell_command(&script); - let mut spawned = spawn_pty_process(&program, &args, Path::new("."), &env_map, &None).await?; + let spawned = spawn_pty_process( + &program, + &args, + Path::new("."), + &env_map, + &None, + TerminalSize::default(), + ) + .await?; + let (session, mut output_rx, _exit_rx) = combine_spawned_output(spawned); - let bg_pid = match wait_for_marker_pid(&mut spawned.output_rx, marker, 2_000).await { + let bg_pid = match wait_for_marker_pid(&mut output_rx, marker, 2_000).await { Ok(pid) => pid, Err(err) => { - spawned.session.terminate(); + session.terminate(); return Err(err); } }; @@ -430,7 +538,7 @@ async fn pty_terminate_kills_background_children_in_same_process_group() -> anyh "expected background child pid {bg_pid} to exist before terminate" ); - spawned.session.terminate(); + session.terminate(); let exited = wait_for_process_exit(bg_pid, 3_000).await?; if !exited {