From f2969f36e8b6aa2aefcd625d2a9fc8425bb2a519 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 9 Jun 2026 15:10:17 -0700 Subject: [PATCH] [codex] Handle Ctrl-C for non-TTY unified exec (#26734) ## Why A long-running unified exec process started with `tty: false` could not be interrupted via `write_stdin`: ordinary non-TTY stdin writes are rejected once stdin is closed, but an exact U+0003 payload should still map to a process interrupt. The interrupt should flow through the same process lifecycle path as a real signal so Codex preserves process-reported output and exit metadata instead of fabricating a Ctrl-C exit code or tearing down the session early. ## What Changed - Add `process/signal` to exec-server with `ProcessSignal::Interrupt` and an empty response. - Add a non-consuming `ProcessHandle::signal` path for spawned processes; on Unix it sends SIGINT to the process group and leaves terminate/hard-kill unchanged. - Route non-TTY U+0003 `write_stdin` through `process.signal(...)` instead of `terminate`, then let the normal post-write collection path drain output and observe exit. - Add exec-server coverage where a shell `trap INT` handler prints the signal and exits with its own code. - Add unified exec coverage where a `tty: false` process traps SIGINT, emits output, and exits with its own code. ## Validation - `just test -p codex-exec-server exec_process_signal_interrupts_process` - `just test -p codex-exec-server` - `just test -p codex-core write_stdin_ctrl_c_interrupts_non_tty_session` --- codex-rs/core/src/unified_exec/process.rs | 15 +- .../core/src/unified_exec/process_manager.rs | 40 +-- .../core/src/unified_exec/process_tests.rs | 5 + codex-rs/core/tests/suite/unified_exec.rs | 243 ++++++++++++++++++ codex-rs/exec-server/src/client.rs | 25 ++ codex-rs/exec-server/src/lib.rs | 3 + codex-rs/exec-server/src/local_process.rs | 54 +++- codex-rs/exec-server/src/process.rs | 3 + codex-rs/exec-server/src/protocol.rs | 18 ++ codex-rs/exec-server/src/remote_process.rs | 6 + codex-rs/exec-server/src/server/handler.rs | 10 + .../exec-server/src/server/process_handler.rs | 9 + codex-rs/exec-server/src/server/registry.rs | 8 + codex-rs/exec-server/tests/exec_process.rs | 125 ++++++++- codex-rs/utils/pty/src/lib.rs | 2 + codex-rs/utils/pty/src/pipe.rs | 20 +- codex-rs/utils/pty/src/process.rs | 48 ++++ codex-rs/utils/pty/src/process_group.rs | 43 ++-- codex-rs/utils/pty/src/pty.rs | 26 +- 19 files changed, 659 insertions(+), 44 deletions(-) diff --git a/codex-rs/core/src/unified_exec/process.rs b/codex-rs/core/src/unified_exec/process.rs index 9671429d0..229e5b52f 100644 --- a/codex-rs/core/src/unified_exec/process.rs +++ b/codex-rs/core/src/unified_exec/process.rs @@ -14,6 +14,7 @@ use tokio_util::sync::CancellationToken; use crate::exec::is_likely_sandbox_denied; use codex_exec_server::ExecProcess; +use codex_exec_server::ProcessSignal as ExecServerProcessSignal; use codex_exec_server::ReadResponse as ExecReadResponse; use codex_exec_server::StartedExecProcess; use codex_exec_server::WriteStatus; @@ -23,6 +24,7 @@ use codex_protocol::protocol::TruncationPolicy; use codex_sandboxing::SandboxType; use codex_utils_output_truncation::formatted_truncate_text; use codex_utils_pty::ExecCommandSession; +use codex_utils_pty::ProcessSignal as PtyProcessSignal; use codex_utils_pty::SpawnedPty; use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS; @@ -31,7 +33,6 @@ use super::head_tail_buffer::HeadTailBuffer; use super::process_state::ProcessState; const EARLY_EXIT_GRACE_PERIOD: Duration = Duration::from_millis(150); - pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync { /// Returns file descriptors that must stay open across the child `exec()`. /// @@ -212,6 +213,18 @@ impl UnifiedExecProcess { } } + pub(super) async fn interrupt(&self) -> Result<(), UnifiedExecError> { + match &self.process_handle { + ProcessHandle::Local(process_handle) => process_handle + .signal(PtyProcessSignal::Interrupt) + .map_err(|err| UnifiedExecError::process_failed(err.to_string())), + ProcessHandle::ExecServer(process_handle) => process_handle + .signal(ExecServerProcessSignal::Interrupt) + .await + .map_err(|err| UnifiedExecError::process_failed(err.to_string())), + } + } + pub(super) fn fail_and_terminate(&self, message: String) { let state = self.state_rx.borrow().clone(); if state.failure_message.is_none() { diff --git a/codex-rs/core/src/unified_exec/process_manager.rs b/codex-rs/core/src/unified_exec/process_manager.rs index cb455b113..c9e6f3fdc 100644 --- a/codex-rs/core/src/unified_exec/process_manager.rs +++ b/codex-rs/core/src/unified_exec/process_manager.rs @@ -72,6 +72,7 @@ const UNIFIED_EXEC_ENV: [(&str, &str); 10] = [ const NETWORK_ACCESS_DENIED_MESSAGE: &str = "Network access was denied by the Codex sandbox network proxy."; const LATE_NETWORK_DENIAL_GRACE_PERIOD: Duration = Duration::from_millis(100); +const INTERRUPT: &str = "\u{3}"; /// Test-only override for deterministic unified exec process IDs. /// @@ -617,24 +618,29 @@ impl UnifiedExecProcessManager { if !request.input.is_empty() { if !tty { - return Err(UnifiedExecError::StdinClosed); - } - match process.write(request.input.as_bytes()).await { - Ok(()) => { - // Give the remote process a brief window to react so that we are - // more likely to capture its output in the poll below. - tokio::time::sleep(Duration::from_millis(100)).await; + if request.input == INTERRUPT { + process.interrupt().await?; + } else { + return Err(UnifiedExecError::StdinClosed); } - Err(err) => { - let status = self.refresh_process_state(process_id).await; - if matches!(status, ProcessStatus::Exited { .. }) { - status_after_write = Some(status); - } else if matches!(err, UnifiedExecError::ProcessFailed { .. }) { - process.terminate(); - self.release_process_id(process_id).await; - return Err(err); - } else { - return Err(err); + } else { + match process.write(request.input.as_bytes()).await { + Ok(()) => { + // Give the remote process a brief window to react so that we are + // more likely to capture its output in the poll below. + tokio::time::sleep(Duration::from_millis(100)).await; + } + Err(err) => { + let status = self.refresh_process_state(process_id).await; + if matches!(status, ProcessStatus::Exited { .. }) { + status_after_write = Some(status); + } else if matches!(err, UnifiedExecError::ProcessFailed { .. }) { + process.terminate(); + self.release_process_id(process_id).await; + return Err(err); + } else { + return Err(err); + } } } } diff --git a/codex-rs/core/src/unified_exec/process_tests.rs b/codex-rs/core/src/unified_exec/process_tests.rs index ee0ea1cba..42db18ff7 100644 --- a/codex-rs/core/src/unified_exec/process_tests.rs +++ b/codex-rs/core/src/unified_exec/process_tests.rs @@ -5,6 +5,7 @@ use codex_exec_server::ExecProcess; use codex_exec_server::ExecProcessEventReceiver; use codex_exec_server::ExecServerError; use codex_exec_server::ProcessId; +use codex_exec_server::ProcessSignal; use codex_exec_server::ReadResponse; use codex_exec_server::StartedExecProcess; use codex_exec_server::WriteResponse; @@ -63,6 +64,10 @@ impl ExecProcess for MockExecProcess { Ok(self.write_response.clone()) } + async fn signal(&self, _signal: ProcessSignal) -> Result<(), ExecServerError> { + Ok(()) + } + async fn terminate(&self) -> Result<(), ExecServerError> { Ok(()) } diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index caea5ecf5..eb8bf2955 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -1995,6 +1995,249 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> { + assert_write_stdin_ctrl_c_interrupts_non_tty_session( + "trap", + "trap 'echo INT-TRAP; exit 42' INT; echo READY; while true; do sleep 30; done", + /*expected_exit_code*/ 42, + Some("INT-TRAP"), + ) + .await +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn write_stdin_ctrl_c_default_interrupt_reports_130_for_non_tty_session() -> Result<()> { + assert_write_stdin_ctrl_c_interrupts_non_tty_session( + "default", + "echo READY; exec sleep 30", + /*expected_exit_code*/ 130, + /*expected_interrupt_output*/ None, + ) + .await +} + +async fn assert_write_stdin_ctrl_c_interrupts_non_tty_session( + test_name: &str, + command: &str, + expected_exit_code: i32, + expected_interrupt_output: Option<&str>, +) -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + skip_if_windows!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + if let Err(err) = config.features.enable(Feature::UnifiedExec) { + panic!("test config should allow feature update: {err}"); + } + }); + let test = builder.build_with_remote_env(&server).await?; + + let start_call_id = format!("uexec-non-tty-interrupt-{test_name}-start"); + let interrupt_call_id = format!("uexec-non-tty-interrupt-{test_name}"); + + let start_args = serde_json::json!({ + "cmd": command, + "yield_time_ms": 250, + "tty": false, + }); + let interrupt_args = serde_json::json!({ + "chars": "\u{3}", + "session_id": 1000, + "yield_time_ms": 1000, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + &start_call_id, + "exec_command", + &serde_json::to_string(&start_args)?, + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_function_call( + &interrupt_call_id, + "write_stdin", + &serde_json::to_string(&interrupt_args)?, + ), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_assistant_message("msg-1", "done"), + ev_completed("resp-3"), + ]), + ]; + let request_log = mount_sse_sequence(&server, responses).await; + + submit_unified_exec_turn( + &test, + "interrupt non-tty unified exec", + PermissionProfile::Disabled, + ) + .await?; + + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + let requests = request_log.requests(); + assert!(!requests.is_empty(), "expected at least one POST request"); + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); + + let outputs = collect_tool_outputs(&bodies)?; + + let start_output = outputs + .get(&start_call_id) + .with_context(|| format!("missing start output for exec_command {start_call_id}"))?; + assert_eq!( + start_output.process_id.as_deref(), + Some("1000"), + "exec_command should leave a running non-TTY session" + ); + assert!( + start_output.exit_code.is_none(), + "initial exec_command should not include exit_code while session is running" + ); + assert!( + start_output.output.contains("READY"), + "start output should include command readiness marker, got {:?}", + start_output.output + ); + + let interrupt_output = outputs + .get(&interrupt_call_id) + .with_context(|| format!("missing interrupt output for write_stdin {interrupt_call_id}"))?; + assert!( + interrupt_output.process_id.is_none(), + "interrupted process should be cleared from the session map" + ); + assert_eq!( + interrupt_output.exit_code, + Some(expected_exit_code), + "interrupt should preserve the process-reported exit code" + ); + if let Some(expected_interrupt_output) = expected_interrupt_output { + assert!( + interrupt_output.output.contains(expected_interrupt_output), + "interrupt should drain output from the signal handler, got {:?}", + interrupt_output.output + ); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[cfg_attr(not(windows), ignore = "Windows-only unified exec interrupt test")] +async fn write_stdin_ctrl_c_reports_unsupported_interrupt_to_model_on_windows() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config + .features + .enable(Feature::UnifiedExec) + .expect("test config should allow feature update"); + }); + let test = builder.build_with_remote_env(&server).await?; + + let start_call_id = "uexec-windows-interrupt-start"; + let interrupt_call_id = "uexec-windows-interrupt"; + + let start_args = serde_json::json!({ + "shell": "cmd", + "cmd": "echo READY && ping -n 30 127.0.0.1 >NUL", + "yield_time_ms": 250, + "tty": false, + }); + let interrupt_args = serde_json::json!({ + "chars": "\u{3}", + "session_id": 1000, + "yield_time_ms": 1000, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + start_call_id, + "exec_command", + &serde_json::to_string(&start_args)?, + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_function_call( + interrupt_call_id, + "write_stdin", + &serde_json::to_string(&interrupt_args)?, + ), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_response_created("resp-3"), + ev_assistant_message("msg-1", "done"), + ev_completed("resp-3"), + ]), + ]; + let request_log = mount_sse_sequence(&server, responses).await; + + submit_unified_exec_turn( + &test, + "interrupt non-tty unified exec on Windows", + PermissionProfile::Disabled, + ) + .await?; + + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + let start_output = request_log + .function_call_output_text(start_call_id) + .expect("missing start output for exec_command"); + let start_output = parse_unified_exec_output(&start_output)?; + assert_eq!( + start_output.process_id.as_deref(), + Some("1000"), + "exec_command should leave a running non-TTY session" + ); + assert!( + start_output.output.contains("READY"), + "start output should include command readiness marker, got {:?}", + start_output.output + ); + + let interrupt_output = request_log + .function_call_output_text(interrupt_call_id) + .expect("missing interrupt output for write_stdin"); + assert!( + interrupt_output.contains("write_stdin failed"), + "model-visible write_stdin output should report failure, got {interrupt_output:?}" + ); + assert!( + interrupt_output.contains("process interrupt is not supported by this process backend"), + "model-visible write_stdin output should explain unsupported interrupt, got {interrupt_output:?}" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 8cf18e7d1..26b3e46e8 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -35,6 +35,7 @@ use crate::protocol::EXEC_EXITED_METHOD; use crate::protocol::EXEC_METHOD; use crate::protocol::EXEC_OUTPUT_DELTA_METHOD; use crate::protocol::EXEC_READ_METHOD; +use crate::protocol::EXEC_SIGNAL_METHOD; use crate::protocol::EXEC_TERMINATE_METHOD; use crate::protocol::EXEC_WRITE_METHOD; use crate::protocol::EnvironmentInfo; @@ -80,8 +81,11 @@ use crate::protocol::INITIALIZED_METHOD; use crate::protocol::InitializeParams; use crate::protocol::InitializeResponse; use crate::protocol::ProcessOutputChunk; +use crate::protocol::ProcessSignal; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; +use crate::protocol::SignalParams; +use crate::protocol::SignalResponse; use crate::protocol::TerminateParams; use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; @@ -394,6 +398,23 @@ impl ExecServerClient { .await } + pub async fn signal( + &self, + process_id: &ProcessId, + signal: ProcessSignal, + ) -> Result<(), ExecServerError> { + let _response: SignalResponse = self + .call( + EXEC_SIGNAL_METHOD, + &SignalParams { + process_id: process_id.clone(), + signal, + }, + ) + .await?; + Ok(()) + } + pub async fn terminate( &self, process_id: &ProcessId, @@ -763,6 +784,10 @@ impl Session { self.client.write(&self.process_id, chunk).await } + pub(crate) async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError> { + self.client.signal(&self.process_id, signal).await + } + pub(crate) async fn terminate(&self) -> Result<(), ExecServerError> { self.client.terminate(&self.process_id).await?; Ok(()) diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index e306ec395..378d12e63 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -93,9 +93,12 @@ pub use protocol::HttpRequestResponse; pub use protocol::InitializeParams; pub use protocol::InitializeResponse; pub use protocol::ProcessOutputChunk; +pub use protocol::ProcessSignal; pub use protocol::ReadParams; pub use protocol::ReadResponse; pub use protocol::ShellInfo; +pub use protocol::SignalParams; +pub use protocol::SignalResponse; pub use protocol::TerminateParams; pub use protocol::TerminateResponse; pub use protocol::WriteParams; diff --git a/codex-rs/exec-server/src/local_process.rs b/codex-rs/exec-server/src/local_process.rs index bc69ec610..691f80703 100644 --- a/codex-rs/exec-server/src/local_process.rs +++ b/codex-rs/exec-server/src/local_process.rs @@ -10,6 +10,7 @@ use codex_protocol::config_types::EnvironmentVariablePattern; use codex_protocol::config_types::ShellEnvironmentPolicy; use codex_protocol::shell_environment; use codex_utils_pty::ExecCommandSession; +use codex_utils_pty::ProcessSignal as PtyProcessSignal; use codex_utils_pty::TerminalSize; use tokio::sync::Mutex; use tokio::sync::Notify; @@ -33,8 +34,11 @@ use crate::protocol::ExecOutputStream; use crate::protocol::ExecParams; use crate::protocol::ExecResponse; use crate::protocol::ProcessOutputChunk; +use crate::protocol::ProcessSignal; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; +use crate::protocol::SignalParams; +use crate::protocol::SignalResponse; use crate::protocol::TerminateParams; use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; @@ -272,7 +276,6 @@ impl LocalProcess { &self, params: ReadParams, ) -> Result { - let _process_id = params.process_id.clone(); let after_seq = params.after_seq.unwrap_or(0); let max_bytes = params.max_bytes.unwrap_or(usize::MAX); let wait = Duration::from_millis(params.wait_ms.unwrap_or(0)); @@ -351,7 +354,6 @@ impl LocalProcess { &self, params: WriteParams, ) -> Result { - let _process_id = params.process_id.clone(); let _input_bytes = params.chunk.0.len(); let writer_tx = { let process_map = self.inner.processes.lock().await; @@ -383,11 +385,33 @@ impl LocalProcess { }) } + pub(crate) async fn signal_process( + &self, + params: SignalParams, + ) -> Result { + { + let process_map = self.inner.processes.lock().await; + match process_map.get(¶ms.process_id) { + Some(ProcessEntry::Running(process)) => { + if process.exit_code.is_some() { + return Ok(SignalResponse {}); + } + process + .session + .signal(pty_process_signal(params.signal)) + .map_err(|err| internal_error(format!("failed to signal process: {err}")))? + } + Some(ProcessEntry::Starting) | None => {} + } + } + + Ok(SignalResponse {}) + } + pub(crate) async fn terminate_process( &self, params: TerminateParams, ) -> Result { - let _process_id = params.process_id.clone(); let running = { let process_map = self.inner.processes.lock().await; match process_map.get(¶ms.process_id) { @@ -483,6 +507,10 @@ impl ExecProcess for LocalExecProcess { self.backend.write(&self.process_id, chunk).await } + async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError> { + self.backend.signal(&self.process_id, signal).await + } + async fn terminate(&self) -> Result<(), ExecServerError> { self.backend.terminate(&self.process_id).await } @@ -519,6 +547,20 @@ impl LocalProcess { .map_err(map_handler_error) } + async fn signal( + &self, + process_id: &ProcessId, + signal: ProcessSignal, + ) -> Result<(), ExecServerError> { + self.signal_process(SignalParams { + process_id: process_id.clone(), + signal, + }) + .await + .map_err(map_handler_error)?; + Ok(()) + } + async fn terminate(&self, process_id: &ProcessId) -> Result<(), ExecServerError> { self.terminate_process(TerminateParams { process_id: process_id.clone(), @@ -529,6 +571,12 @@ impl LocalProcess { } } +fn pty_process_signal(signal: ProcessSignal) -> PtyProcessSignal { + match signal { + ProcessSignal::Interrupt => PtyProcessSignal::Interrupt, + } +} + fn map_handler_error(error: JSONRPCErrorError) -> ExecServerError { ExecServerError::Server { code: error.code, diff --git a/codex-rs/exec-server/src/process.rs b/codex-rs/exec-server/src/process.rs index cb6c83213..97f53aaef 100644 --- a/codex-rs/exec-server/src/process.rs +++ b/codex-rs/exec-server/src/process.rs @@ -10,6 +10,7 @@ use crate::ExecServerError; use crate::ProcessId; use crate::protocol::ExecParams; use crate::protocol::ProcessOutputChunk; +use crate::protocol::ProcessSignal; use crate::protocol::ReadResponse; use crate::protocol::WriteResponse; @@ -178,6 +179,8 @@ pub trait ExecProcess: Send + Sync { async fn write(&self, chunk: Vec) -> Result; + async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError>; + async fn terminate(&self) -> Result<(), ExecServerError>; } diff --git a/codex-rs/exec-server/src/protocol.rs b/codex-rs/exec-server/src/protocol.rs index 6eeeeefdd..b85bff357 100644 --- a/codex-rs/exec-server/src/protocol.rs +++ b/codex-rs/exec-server/src/protocol.rs @@ -15,6 +15,7 @@ pub const INITIALIZED_METHOD: &str = "initialized"; pub const EXEC_METHOD: &str = "process/start"; pub const EXEC_READ_METHOD: &str = "process/read"; pub const EXEC_WRITE_METHOD: &str = "process/write"; +pub const EXEC_SIGNAL_METHOD: &str = "process/signal"; pub const EXEC_TERMINATE_METHOD: &str = "process/terminate"; pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output"; pub const EXEC_EXITED_METHOD: &str = "process/exited"; @@ -166,6 +167,23 @@ pub struct WriteResponse { pub status: WriteStatus, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ProcessSignal { + Interrupt, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SignalParams { + pub process_id: ProcessId, + pub signal: ProcessSignal, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SignalResponse {} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct TerminateParams { diff --git a/codex-rs/exec-server/src/remote_process.rs b/codex-rs/exec-server/src/remote_process.rs index d8d06735c..c7d16d4d4 100644 --- a/codex-rs/exec-server/src/remote_process.rs +++ b/codex-rs/exec-server/src/remote_process.rs @@ -12,6 +12,7 @@ use crate::StartedExecProcess; use crate::client::LazyRemoteExecServerClient; use crate::client::Session; use crate::protocol::ExecParams; +use crate::protocol::ProcessSignal; use crate::protocol::ReadResponse; use crate::protocol::WriteResponse; @@ -76,6 +77,11 @@ impl ExecProcess for RemoteExecProcess { self.session.write(chunk).await } + async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError> { + trace!("exec process signal"); + self.session.signal(signal).await + } + async fn terminate(&self) -> Result<(), ExecServerError> { trace!("exec process terminate"); self.session.terminate().await diff --git a/codex-rs/exec-server/src/server/handler.rs b/codex-rs/exec-server/src/server/handler.rs index b8934705d..395ce6773 100644 --- a/codex-rs/exec-server/src/server/handler.rs +++ b/codex-rs/exec-server/src/server/handler.rs @@ -42,6 +42,8 @@ use crate::protocol::InitializeParams; use crate::protocol::InitializeResponse; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; +use crate::protocol::SignalParams; +use crate::protocol::SignalResponse; use crate::protocol::TerminateParams; use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; @@ -171,6 +173,14 @@ impl ExecServerHandler { session.process().exec_write(params).await } + pub(crate) async fn signal( + &self, + params: SignalParams, + ) -> Result { + let session = self.require_initialized_for("exec")?; + session.process().signal(params).await + } + pub(crate) async fn terminate( &self, params: TerminateParams, diff --git a/codex-rs/exec-server/src/server/process_handler.rs b/codex-rs/exec-server/src/server/process_handler.rs index 38fbace1c..9fced9c16 100644 --- a/codex-rs/exec-server/src/server/process_handler.rs +++ b/codex-rs/exec-server/src/server/process_handler.rs @@ -5,6 +5,8 @@ use crate::protocol::ExecParams; use crate::protocol::ExecResponse; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; +use crate::protocol::SignalParams; +use crate::protocol::SignalResponse; use crate::protocol::TerminateParams; use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; @@ -49,6 +51,13 @@ impl ProcessHandler { self.process.exec_write(params).await } + pub(crate) async fn signal( + &self, + params: SignalParams, + ) -> Result { + self.process.signal_process(params).await + } + pub(crate) async fn terminate( &self, params: TerminateParams, diff --git a/codex-rs/exec-server/src/server/registry.rs b/codex-rs/exec-server/src/server/registry.rs index 26d2876c6..1f652c1c4 100644 --- a/codex-rs/exec-server/src/server/registry.rs +++ b/codex-rs/exec-server/src/server/registry.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::protocol::ENVIRONMENT_INFO_METHOD; use crate::protocol::EXEC_METHOD; use crate::protocol::EXEC_READ_METHOD; +use crate::protocol::EXEC_SIGNAL_METHOD; use crate::protocol::EXEC_TERMINATE_METHOD; use crate::protocol::EXEC_WRITE_METHOD; use crate::protocol::ExecParams; @@ -32,6 +33,7 @@ use crate::protocol::INITIALIZE_METHOD; use crate::protocol::INITIALIZED_METHOD; use crate::protocol::InitializeParams; use crate::protocol::ReadParams; +use crate::protocol::SignalParams; use crate::protocol::TerminateParams; use crate::protocol::WriteParams; use crate::rpc::RpcRouter; @@ -77,6 +79,12 @@ pub(crate) fn build_router() -> RpcRouter { handler.exec_write(params).await }, ); + router.request( + EXEC_SIGNAL_METHOD, + |handler: Arc, params: SignalParams| async move { + handler.signal(params).await + }, + ); router.request( EXEC_TERMINATE_METHOD, |handler: Arc, params: TerminateParams| async move { diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index e1f330fc4..af8e722c6 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -1,5 +1,3 @@ -#![cfg(unix)] - mod common; use std::sync::Arc; @@ -13,6 +11,7 @@ use codex_exec_server::ExecParams; use codex_exec_server::ExecProcess; use codex_exec_server::ExecProcessEvent; use codex_exec_server::ProcessId; +use codex_exec_server::ProcessSignal; use codex_exec_server::ReadResponse; use codex_exec_server::StartedExecProcess; use codex_exec_server::WriteStatus; @@ -505,6 +504,98 @@ async fn assert_exec_process_rejects_write_without_pipe_stdin(use_remote: bool) Ok(()) } +async fn assert_exec_process_signal_interrupts_process(use_remote: bool) -> Result<()> { + let context = create_process_context(use_remote).await?; + let process_id = "proc-signal".to_string(); + let session = context + .backend + .start(ExecParams { + process_id: process_id.clone().into(), + argv: vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "trap 'printf \"signal:2\\n\"; exit 7' INT; printf 'ready\\n'; while :; do :; done".to_string(), + ], + cwd: std::env::current_dir()?, + env_policy: /*env_policy*/ None, + env: Default::default(), + tty: false, + pipe_stdin: false, + arg0: None, + }) + .await?; + assert_eq!(session.process.process_id().as_str(), process_id); + + let StartedExecProcess { process } = session; + let mut wake_rx = process.subscribe_wake(); + let mut ready_output = String::new(); + let mut after_seq = None; + loop { + let response = + read_process_until_change(Arc::clone(&process), &mut wake_rx, after_seq).await?; + for chunk in response.chunks { + ready_output.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner())); + after_seq = Some(chunk.seq); + } + if ready_output.contains("ready\n") { + break; + } + if response.closed { + anyhow::bail!("process closed before readiness marker: {ready_output:?}"); + } + after_seq = response.next_seq.checked_sub(1).or(after_seq); + } + + process.signal(ProcessSignal::Interrupt).await?; + let (output, exit_code, closed) = collect_process_output_from_reads(process, wake_rx).await?; + + assert!( + output.contains("signal:2"), + "expected signal handler output, got {output:?}" + ); + assert_eq!(exit_code, Some(7)); + assert!(closed); + Ok(()) +} + +async fn assert_exec_process_signal_reports_unsupported_on_windows(use_remote: bool) -> Result<()> { + let context = create_process_context(use_remote).await?; + let session = context + .backend + .start(ExecParams { + process_id: ProcessId::from("proc-windows-signal"), + argv: vec![ + "cmd".to_string(), + "/C".to_string(), + "echo ready && ping -n 30 127.0.0.1 >NUL".to_string(), + ], + cwd: std::env::current_dir()?, + env_policy: /*env_policy*/ None, + env: Default::default(), + tty: false, + pipe_stdin: false, + arg0: None, + }) + .await?; + + let err = match session.process.signal(ProcessSignal::Interrupt).await { + Ok(()) => anyhow::bail!("Windows non-TTY signal should report unsupported"), + Err(err) => err, + }; + let message = err.to_string(); + assert!( + message.contains("failed to signal process"), + "unexpected signal error: {message}" + ); + assert!( + message.contains("process interrupt is not supported by this process backend"), + "unexpected signal error: {message}" + ); + + session.process.terminate().await?; + Ok(()) +} + async fn assert_exec_process_preserves_queued_events_before_subscribe( use_remote: bool, ) -> Result<()> { @@ -539,6 +630,7 @@ async fn assert_exec_process_preserves_queued_events_before_subscribe( } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] async fn remote_exec_process_reports_transport_disconnect() -> Result<()> { @@ -630,6 +722,7 @@ async fn remote_exec_process_reports_transport_disconnect() -> Result<()> { #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -639,6 +732,7 @@ async fn exec_process_starts_and_exits(use_remote: bool) -> Result<()> { #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -648,6 +742,7 @@ async fn exec_process_streams_output(use_remote: bool) -> Result<()> { #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -657,6 +752,7 @@ async fn exec_process_pushes_events(use_remote: bool) -> Result<()> { #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -666,6 +762,7 @@ async fn exec_process_replays_events_after_close(use_remote: bool) -> Result<()> #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -677,6 +774,7 @@ async fn exec_process_retains_output_after_exit_until_streams_close( #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -686,6 +784,7 @@ async fn exec_process_write_then_read(use_remote: bool) -> Result<()> { #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -695,6 +794,7 @@ async fn exec_process_write_then_read_without_tty(use_remote: bool) -> Result<() #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] @@ -704,6 +804,27 @@ async fn exec_process_rejects_write_without_pipe_stdin(use_remote: bool) -> Resu #[test_case(false ; "local")] #[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// Serialize tests that launch a real exec-server process through the full CLI. +#[serial_test::serial(remote_exec_server)] +async fn exec_process_signal_interrupts_process(use_remote: bool) -> Result<()> { + assert_exec_process_signal_interrupts_process(use_remote).await +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[cfg_attr(not(windows), ignore = "Windows-only exec-server process test")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// Serialize tests that launch a real exec-server process through the full CLI. +#[serial_test::serial(remote_exec_server)] +async fn exec_process_signal_reports_unsupported_on_windows(use_remote: bool) -> Result<()> { + assert_exec_process_signal_reports_unsupported_on_windows(use_remote).await +} + +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // Serialize tests that launch a real exec-server process through the full CLI. #[serial_test::serial(remote_exec_server)] diff --git a/codex-rs/utils/pty/src/lib.rs b/codex-rs/utils/pty/src/lib.rs index 39fc9b552..9f0cb442a 100644 --- a/codex-rs/utils/pty/src/lib.rs +++ b/codex-rs/utils/pty/src/lib.rs @@ -17,6 +17,8 @@ pub use pipe::spawn_process_no_stdin as spawn_pipe_process_no_stdin; pub use process::ProcessDriver; /// Handle for interacting with a spawned process (PTY or pipe). pub use process::ProcessHandle; +/// Process signal supported by spawned-process handles. +pub use process::ProcessSignal; /// Bundle of process handles plus split output and exit receivers returned by spawn helpers. pub use process::SpawnedProcess; /// Terminal size in character cells used for PTY spawn and resize operations. diff --git a/codex-rs/utils/pty/src/pipe.rs b/codex-rs/utils/pty/src/pipe.rs index 541a2ecf2..3a9b62d9b 100644 --- a/codex-rs/utils/pty/src/pipe.rs +++ b/codex-rs/utils/pty/src/pipe.rs @@ -19,7 +19,9 @@ use tokio::task::JoinHandle; use crate::process::ChildTerminator; use crate::process::ProcessHandle; +use crate::process::ProcessSignal; use crate::process::SpawnedProcess; +use crate::process::exit_code_from_status; #[cfg(target_os = "linux")] use libc; @@ -32,6 +34,22 @@ struct PipeChildTerminator { } impl ChildTerminator for PipeChildTerminator { + fn signal(&mut self, signal: ProcessSignal) -> io::Result<()> { + match signal { + ProcessSignal::Interrupt => { + #[cfg(unix)] + { + crate::process_group::interrupt_process_group(self.process_group_id) + } + + #[cfg(not(unix))] + { + Err(crate::process::unsupported_signal(signal)) + } + } + } + } + fn kill(&mut self) -> io::Result<()> { #[cfg(unix)] { @@ -209,7 +227,7 @@ async fn spawn_process_with_stdin_mode( let wait_exit_code = Arc::clone(&exit_code); let wait_handle: JoinHandle<()> = tokio::spawn(async move { let code = match child.wait().await { - Ok(status) => status.code().unwrap_or(-1), + Ok(status) => exit_code_from_status(status), Err(_) => -1, }; wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst); diff --git a/codex-rs/utils/pty/src/process.rs b/codex-rs/utils/pty/src/process.rs index 898a3d90f..fdd693f43 100644 --- a/codex-rs/utils/pty/src/process.rs +++ b/codex-rs/utils/pty/src/process.rs @@ -2,6 +2,7 @@ use core::fmt; use std::io; #[cfg(unix)] use std::os::fd::RawFd; +use std::process::ExitStatus; use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::sync::atomic::AtomicBool; @@ -17,7 +18,39 @@ use tokio::sync::watch; use tokio::task::AbortHandle; use tokio::task::JoinHandle; +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ProcessSignal { + Interrupt, +} + +pub(crate) fn unsupported_signal(signal: ProcessSignal) -> io::Error { + match signal { + ProcessSignal::Interrupt => io::Error::new( + io::ErrorKind::Unsupported, + "process interrupt is not supported by this process backend", + ), + } +} + +pub(crate) fn exit_code_from_status(status: ExitStatus) -> i32 { + if let Some(code) = status.code() { + return code; + } + + #[cfg(unix)] + { + use std::os::unix::process::ExitStatusExt; + if let Some(signal) = status.signal() { + return 128 + signal; + } + } + + -1 +} + pub(crate) trait ChildTerminator: Send + Sync { + fn signal(&mut self, signal: ProcessSignal) -> io::Result<()>; + fn kill(&mut self) -> io::Result<()>; } @@ -193,6 +226,17 @@ impl ProcessHandle { } } + pub fn signal(&self, signal: ProcessSignal) -> io::Result<()> { + let Ok(mut killer_opt) = self.killer.lock() else { + return Ok(()); + }; + let Some(killer) = killer_opt.as_mut() else { + return Ok(()); + }; + + killer.signal(signal) + } + /// Attempts to kill the child and abort helper tasks. pub fn terminate(&self) { self.request_terminate(); @@ -232,6 +276,10 @@ struct ClosureTerminator { } impl ChildTerminator for ClosureTerminator { + fn signal(&mut self, signal: ProcessSignal) -> io::Result<()> { + Err(unsupported_signal(signal)) + } + fn kill(&mut self) -> io::Result<()> { if let Some(inner) = self.inner.as_mut() { (inner)(); diff --git a/codex-rs/utils/pty/src/process_group.rs b/codex-rs/utils/pty/src/process_group.rs index 22a934fab..9d10d7c36 100644 --- a/codex-rs/utils/pty/src/process_group.rs +++ b/codex-rs/utils/pty/src/process_group.rs @@ -118,15 +118,10 @@ pub fn kill_process_group_by_pid(_pid: u32) -> io::Result<()> { } #[cfg(unix)] -/// Send SIGTERM to a specific process group ID (best-effort). -/// -/// Returns `Ok(true)` when SIGTERM was delivered to an existing group and -/// `Ok(false)` when the group no longer exists. -pub fn terminate_process_group(process_group_id: u32) -> io::Result { +fn signal_process_group_id(pgid: libc::pid_t, signal: libc::c_int) -> io::Result { use std::io::ErrorKind; - let pgid = process_group_id as libc::pid_t; - let result = unsafe { libc::killpg(pgid, libc::SIGTERM) }; + let result = unsafe { libc::killpg(pgid, signal) }; if result == -1 { let err = io::Error::last_os_error(); if err.kind() == ErrorKind::NotFound || err.raw_os_error() == Some(libc::ESRCH) { @@ -138,27 +133,37 @@ pub fn terminate_process_group(process_group_id: u32) -> io::Result { Ok(true) } +#[cfg(unix)] +/// Send SIGTERM to a specific process group ID (best-effort). +/// +/// Returns `Ok(true)` when SIGTERM was delivered to an existing group and +/// `Ok(false)` when the group no longer exists. +pub fn terminate_process_group(process_group_id: u32) -> io::Result { + signal_process_group_id(process_group_id as libc::pid_t, libc::SIGTERM) +} + #[cfg(not(unix))] /// No-op on non-Unix platforms. pub fn terminate_process_group(_process_group_id: u32) -> io::Result { Ok(false) } +#[cfg(unix)] +/// Send SIGINT to a specific process group ID (best-effort). +pub fn interrupt_process_group(process_group_id: u32) -> io::Result<()> { + signal_process_group_id(process_group_id as libc::pid_t, libc::SIGINT).map(|_| ()) +} + +#[cfg(not(unix))] +/// No-op on non-Unix platforms. +pub fn interrupt_process_group(_process_group_id: u32) -> io::Result<()> { + Ok(()) +} + #[cfg(unix)] /// Kill a specific process group ID (best-effort). pub fn kill_process_group(process_group_id: u32) -> io::Result<()> { - use std::io::ErrorKind; - - let pgid = process_group_id as libc::pid_t; - let result = unsafe { libc::killpg(pgid, libc::SIGKILL) }; - if result == -1 { - let err = io::Error::last_os_error(); - if err.kind() != ErrorKind::NotFound && err.raw_os_error() != Some(libc::ESRCH) { - return Err(err); - } - } - - Ok(()) + signal_process_group_id(process_group_id as libc::pid_t, libc::SIGKILL).map(|_| ()) } #[cfg(not(unix))] diff --git a/codex-rs/utils/pty/src/pty.rs b/codex-rs/utils/pty/src/pty.rs index 45c587b32..951818a80 100644 --- a/codex-rs/utils/pty/src/pty.rs +++ b/codex-rs/utils/pty/src/pty.rs @@ -30,10 +30,13 @@ use tokio::task::JoinHandle; use crate::process::ChildTerminator; use crate::process::ProcessHandle; +use crate::process::ProcessSignal; use crate::process::PtyHandles; use crate::process::PtyMasterHandle; use crate::process::SpawnedProcess; use crate::process::TerminalSize; +#[cfg(unix)] +use crate::process::exit_code_from_status; /// Returns true when ConPTY support is available (Windows only). #[cfg(windows)] @@ -54,6 +57,19 @@ struct PtyChildTerminator { } impl ChildTerminator for PtyChildTerminator { + fn signal(&mut self, signal: ProcessSignal) -> std::io::Result<()> { + match signal { + ProcessSignal::Interrupt => { + #[cfg(unix)] + if let Some(process_group_id) = self.process_group_id { + return crate::process_group::interrupt_process_group(process_group_id); + } + + Err(crate::process::unsupported_signal(signal)) + } + } + } + fn kill(&mut self) -> std::io::Result<()> { #[cfg(unix)] if let Some(process_group_id) = self.process_group_id { @@ -81,6 +97,14 @@ struct RawPidTerminator { #[cfg(unix)] impl ChildTerminator for RawPidTerminator { + fn signal(&mut self, signal: ProcessSignal) -> std::io::Result<()> { + match signal { + ProcessSignal::Interrupt => { + crate::process_group::interrupt_process_group(self.process_group_id) + } + } + } + fn kill(&mut self) -> std::io::Result<()> { crate::process_group::kill_process_group(self.process_group_id) } @@ -368,7 +392,7 @@ async fn spawn_process_preserving_fds( let wait_exit_code = Arc::clone(&exit_code); let wait_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || { let code = match child.wait() { - Ok(status) => status.code().unwrap_or(-1), + Ok(status) => exit_code_from_status(status), Err(_) => -1, }; wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);