[codex] Handle Ctrl-C for non-TTY unified exec (#26734)

## Why

A long-running unified exec process started with `tty: false` could not
be interrupted via `write_stdin`: ordinary non-TTY stdin writes are
rejected once stdin is closed, but an exact U+0003 payload should still
map to a process interrupt. The interrupt should flow through the same
process lifecycle path as a real signal so Codex preserves
process-reported output and exit metadata instead of fabricating a
Ctrl-C exit code or tearing down the session early.

## What Changed

- Add `process/signal` to exec-server with `ProcessSignal::Interrupt`
and an empty response.
- Add a non-consuming `ProcessHandle::signal` path for spawned
processes; on Unix it sends SIGINT to the process group and leaves
terminate/hard-kill unchanged.
- Route non-TTY U+0003 `write_stdin` through `process.signal(...)`
instead of `terminate`, then let the normal post-write collection path
drain output and observe exit.
- Add exec-server coverage where a shell `trap INT` handler prints the
signal and exits with its own code.
- Add unified exec coverage where a `tty: false` process traps SIGINT,
emits output, and exits with its own code.

## Validation

- `just test -p codex-exec-server
exec_process_signal_interrupts_process`
- `just test -p codex-exec-server`
- `just test -p codex-core
write_stdin_ctrl_c_interrupts_non_tty_session`
This commit is contained in:
pakrym-oai
2026-06-09 15:10:17 -07:00
committed by GitHub
Unverified
parent f574946960
commit f2969f36e8
19 changed files with 659 additions and 44 deletions
+14 -1
View File
@@ -14,6 +14,7 @@ use tokio_util::sync::CancellationToken;
use crate::exec::is_likely_sandbox_denied;
use codex_exec_server::ExecProcess;
use codex_exec_server::ProcessSignal as ExecServerProcessSignal;
use codex_exec_server::ReadResponse as ExecReadResponse;
use codex_exec_server::StartedExecProcess;
use codex_exec_server::WriteStatus;
@@ -23,6 +24,7 @@ use codex_protocol::protocol::TruncationPolicy;
use codex_sandboxing::SandboxType;
use codex_utils_output_truncation::formatted_truncate_text;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::ProcessSignal as PtyProcessSignal;
use codex_utils_pty::SpawnedPty;
use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS;
@@ -31,7 +33,6 @@ use super::head_tail_buffer::HeadTailBuffer;
use super::process_state::ProcessState;
const EARLY_EXIT_GRACE_PERIOD: Duration = Duration::from_millis(150);
pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync {
/// Returns file descriptors that must stay open across the child `exec()`.
///
@@ -212,6 +213,18 @@ impl UnifiedExecProcess {
}
}
pub(super) async fn interrupt(&self) -> Result<(), UnifiedExecError> {
match &self.process_handle {
ProcessHandle::Local(process_handle) => process_handle
.signal(PtyProcessSignal::Interrupt)
.map_err(|err| UnifiedExecError::process_failed(err.to_string())),
ProcessHandle::ExecServer(process_handle) => process_handle
.signal(ExecServerProcessSignal::Interrupt)
.await
.map_err(|err| UnifiedExecError::process_failed(err.to_string())),
}
}
pub(super) fn fail_and_terminate(&self, message: String) {
let state = self.state_rx.borrow().clone();
if state.failure_message.is_none() {
@@ -72,6 +72,7 @@ const UNIFIED_EXEC_ENV: [(&str, &str); 10] = [
const NETWORK_ACCESS_DENIED_MESSAGE: &str =
"Network access was denied by the Codex sandbox network proxy.";
const LATE_NETWORK_DENIAL_GRACE_PERIOD: Duration = Duration::from_millis(100);
const INTERRUPT: &str = "\u{3}";
/// Test-only override for deterministic unified exec process IDs.
///
@@ -617,24 +618,29 @@ impl UnifiedExecProcessManager {
if !request.input.is_empty() {
if !tty {
return Err(UnifiedExecError::StdinClosed);
}
match process.write(request.input.as_bytes()).await {
Ok(()) => {
// Give the remote process a brief window to react so that we are
// more likely to capture its output in the poll below.
tokio::time::sleep(Duration::from_millis(100)).await;
if request.input == INTERRUPT {
process.interrupt().await?;
} else {
return Err(UnifiedExecError::StdinClosed);
}
Err(err) => {
let status = self.refresh_process_state(process_id).await;
if matches!(status, ProcessStatus::Exited { .. }) {
status_after_write = Some(status);
} else if matches!(err, UnifiedExecError::ProcessFailed { .. }) {
process.terminate();
self.release_process_id(process_id).await;
return Err(err);
} else {
return Err(err);
} else {
match process.write(request.input.as_bytes()).await {
Ok(()) => {
// Give the remote process a brief window to react so that we are
// more likely to capture its output in the poll below.
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(err) => {
let status = self.refresh_process_state(process_id).await;
if matches!(status, ProcessStatus::Exited { .. }) {
status_after_write = Some(status);
} else if matches!(err, UnifiedExecError::ProcessFailed { .. }) {
process.terminate();
self.release_process_id(process_id).await;
return Err(err);
} else {
return Err(err);
}
}
}
}
@@ -5,6 +5,7 @@ use codex_exec_server::ExecProcess;
use codex_exec_server::ExecProcessEventReceiver;
use codex_exec_server::ExecServerError;
use codex_exec_server::ProcessId;
use codex_exec_server::ProcessSignal;
use codex_exec_server::ReadResponse;
use codex_exec_server::StartedExecProcess;
use codex_exec_server::WriteResponse;
@@ -63,6 +64,10 @@ impl ExecProcess for MockExecProcess {
Ok(self.write_response.clone())
}
async fn signal(&self, _signal: ProcessSignal) -> Result<(), ExecServerError> {
Ok(())
}
async fn terminate(&self) -> Result<(), ExecServerError> {
Ok(())
}
+243
View File
@@ -1995,6 +1995,249 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_stdin_ctrl_c_interrupts_non_tty_session() -> Result<()> {
assert_write_stdin_ctrl_c_interrupts_non_tty_session(
"trap",
"trap 'echo INT-TRAP; exit 42' INT; echo READY; while true; do sleep 30; done",
/*expected_exit_code*/ 42,
Some("INT-TRAP"),
)
.await
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_stdin_ctrl_c_default_interrupt_reports_130_for_non_tty_session() -> Result<()> {
assert_write_stdin_ctrl_c_interrupts_non_tty_session(
"default",
"echo READY; exec sleep 30",
/*expected_exit_code*/ 130,
/*expected_interrupt_output*/ None,
)
.await
}
async fn assert_write_stdin_ctrl_c_interrupts_non_tty_session(
test_name: &str,
command: &str,
expected_exit_code: i32,
expected_interrupt_output: Option<&str>,
) -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
if let Err(err) = config.features.enable(Feature::UnifiedExec) {
panic!("test config should allow feature update: {err}");
}
});
let test = builder.build_with_remote_env(&server).await?;
let start_call_id = format!("uexec-non-tty-interrupt-{test_name}-start");
let interrupt_call_id = format!("uexec-non-tty-interrupt-{test_name}");
let start_args = serde_json::json!({
"cmd": command,
"yield_time_ms": 250,
"tty": false,
});
let interrupt_args = serde_json::json!({
"chars": "\u{3}",
"session_id": 1000,
"yield_time_ms": 1000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
&start_call_id,
"exec_command",
&serde_json::to_string(&start_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
&interrupt_call_id,
"write_stdin",
&serde_json::to_string(&interrupt_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"interrupt non-tty unified exec",
PermissionProfile::Disabled,
)
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let start_output = outputs
.get(&start_call_id)
.with_context(|| format!("missing start output for exec_command {start_call_id}"))?;
assert_eq!(
start_output.process_id.as_deref(),
Some("1000"),
"exec_command should leave a running non-TTY session"
);
assert!(
start_output.exit_code.is_none(),
"initial exec_command should not include exit_code while session is running"
);
assert!(
start_output.output.contains("READY"),
"start output should include command readiness marker, got {:?}",
start_output.output
);
let interrupt_output = outputs
.get(&interrupt_call_id)
.with_context(|| format!("missing interrupt output for write_stdin {interrupt_call_id}"))?;
assert!(
interrupt_output.process_id.is_none(),
"interrupted process should be cleared from the session map"
);
assert_eq!(
interrupt_output.exit_code,
Some(expected_exit_code),
"interrupt should preserve the process-reported exit code"
);
if let Some(expected_interrupt_output) = expected_interrupt_output {
assert!(
interrupt_output.output.contains(expected_interrupt_output),
"interrupt should drain output from the signal handler, got {:?}",
interrupt_output.output
);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg_attr(not(windows), ignore = "Windows-only unified exec interrupt test")]
async fn write_stdin_ctrl_c_reports_unsupported_interrupt_to_model_on_windows() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::UnifiedExec)
.expect("test config should allow feature update");
});
let test = builder.build_with_remote_env(&server).await?;
let start_call_id = "uexec-windows-interrupt-start";
let interrupt_call_id = "uexec-windows-interrupt";
let start_args = serde_json::json!({
"shell": "cmd",
"cmd": "echo READY && ping -n 30 127.0.0.1 >NUL",
"yield_time_ms": 250,
"tty": false,
});
let interrupt_args = serde_json::json!({
"chars": "\u{3}",
"session_id": 1000,
"yield_time_ms": 1000,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
start_call_id,
"exec_command",
&serde_json::to_string(&start_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
interrupt_call_id,
"write_stdin",
&serde_json::to_string(&interrupt_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
submit_unified_exec_turn(
&test,
"interrupt non-tty unified exec on Windows",
PermissionProfile::Disabled,
)
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let start_output = request_log
.function_call_output_text(start_call_id)
.expect("missing start output for exec_command");
let start_output = parse_unified_exec_output(&start_output)?;
assert_eq!(
start_output.process_id.as_deref(),
Some("1000"),
"exec_command should leave a running non-TTY session"
);
assert!(
start_output.output.contains("READY"),
"start output should include command readiness marker, got {:?}",
start_output.output
);
let interrupt_output = request_log
.function_call_output_text(interrupt_call_id)
.expect("missing interrupt output for write_stdin");
assert!(
interrupt_output.contains("write_stdin failed"),
"model-visible write_stdin output should report failure, got {interrupt_output:?}"
);
assert!(
interrupt_output.contains("process interrupt is not supported by this process backend"),
"model-visible write_stdin output should explain unsupported interrupt, got {interrupt_output:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));
+25
View File
@@ -35,6 +35,7 @@ use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_SIGNAL_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::EnvironmentInfo;
@@ -80,8 +81,11 @@ use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ProcessSignal;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::SignalParams;
use crate::protocol::SignalResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -394,6 +398,23 @@ impl ExecServerClient {
.await
}
pub async fn signal(
&self,
process_id: &ProcessId,
signal: ProcessSignal,
) -> Result<(), ExecServerError> {
let _response: SignalResponse = self
.call(
EXEC_SIGNAL_METHOD,
&SignalParams {
process_id: process_id.clone(),
signal,
},
)
.await?;
Ok(())
}
pub async fn terminate(
&self,
process_id: &ProcessId,
@@ -763,6 +784,10 @@ impl Session {
self.client.write(&self.process_id, chunk).await
}
pub(crate) async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError> {
self.client.signal(&self.process_id, signal).await
}
pub(crate) async fn terminate(&self) -> Result<(), ExecServerError> {
self.client.terminate(&self.process_id).await?;
Ok(())
+3
View File
@@ -93,9 +93,12 @@ pub use protocol::HttpRequestResponse;
pub use protocol::InitializeParams;
pub use protocol::InitializeResponse;
pub use protocol::ProcessOutputChunk;
pub use protocol::ProcessSignal;
pub use protocol::ReadParams;
pub use protocol::ReadResponse;
pub use protocol::ShellInfo;
pub use protocol::SignalParams;
pub use protocol::SignalResponse;
pub use protocol::TerminateParams;
pub use protocol::TerminateResponse;
pub use protocol::WriteParams;
+51 -3
View File
@@ -10,6 +10,7 @@ use codex_protocol::config_types::EnvironmentVariablePattern;
use codex_protocol::config_types::ShellEnvironmentPolicy;
use codex_protocol::shell_environment;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::ProcessSignal as PtyProcessSignal;
use codex_utils_pty::TerminalSize;
use tokio::sync::Mutex;
use tokio::sync::Notify;
@@ -33,8 +34,11 @@ use crate::protocol::ExecOutputStream;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ProcessSignal;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::SignalParams;
use crate::protocol::SignalResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -272,7 +276,6 @@ impl LocalProcess {
&self,
params: ReadParams,
) -> Result<ReadResponse, JSONRPCErrorError> {
let _process_id = params.process_id.clone();
let after_seq = params.after_seq.unwrap_or(0);
let max_bytes = params.max_bytes.unwrap_or(usize::MAX);
let wait = Duration::from_millis(params.wait_ms.unwrap_or(0));
@@ -351,7 +354,6 @@ impl LocalProcess {
&self,
params: WriteParams,
) -> Result<WriteResponse, JSONRPCErrorError> {
let _process_id = params.process_id.clone();
let _input_bytes = params.chunk.0.len();
let writer_tx = {
let process_map = self.inner.processes.lock().await;
@@ -383,11 +385,33 @@ impl LocalProcess {
})
}
pub(crate) async fn signal_process(
&self,
params: SignalParams,
) -> Result<SignalResponse, JSONRPCErrorError> {
{
let process_map = self.inner.processes.lock().await;
match process_map.get(&params.process_id) {
Some(ProcessEntry::Running(process)) => {
if process.exit_code.is_some() {
return Ok(SignalResponse {});
}
process
.session
.signal(pty_process_signal(params.signal))
.map_err(|err| internal_error(format!("failed to signal process: {err}")))?
}
Some(ProcessEntry::Starting) | None => {}
}
}
Ok(SignalResponse {})
}
pub(crate) async fn terminate_process(
&self,
params: TerminateParams,
) -> Result<TerminateResponse, JSONRPCErrorError> {
let _process_id = params.process_id.clone();
let running = {
let process_map = self.inner.processes.lock().await;
match process_map.get(&params.process_id) {
@@ -483,6 +507,10 @@ impl ExecProcess for LocalExecProcess {
self.backend.write(&self.process_id, chunk).await
}
async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError> {
self.backend.signal(&self.process_id, signal).await
}
async fn terminate(&self) -> Result<(), ExecServerError> {
self.backend.terminate(&self.process_id).await
}
@@ -519,6 +547,20 @@ impl LocalProcess {
.map_err(map_handler_error)
}
async fn signal(
&self,
process_id: &ProcessId,
signal: ProcessSignal,
) -> Result<(), ExecServerError> {
self.signal_process(SignalParams {
process_id: process_id.clone(),
signal,
})
.await
.map_err(map_handler_error)?;
Ok(())
}
async fn terminate(&self, process_id: &ProcessId) -> Result<(), ExecServerError> {
self.terminate_process(TerminateParams {
process_id: process_id.clone(),
@@ -529,6 +571,12 @@ impl LocalProcess {
}
}
fn pty_process_signal(signal: ProcessSignal) -> PtyProcessSignal {
match signal {
ProcessSignal::Interrupt => PtyProcessSignal::Interrupt,
}
}
fn map_handler_error(error: JSONRPCErrorError) -> ExecServerError {
ExecServerError::Server {
code: error.code,
+3
View File
@@ -10,6 +10,7 @@ use crate::ExecServerError;
use crate::ProcessId;
use crate::protocol::ExecParams;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ProcessSignal;
use crate::protocol::ReadResponse;
use crate::protocol::WriteResponse;
@@ -178,6 +179,8 @@ pub trait ExecProcess: Send + Sync {
async fn write(&self, chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError>;
async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError>;
async fn terminate(&self) -> Result<(), ExecServerError>;
}
+18
View File
@@ -15,6 +15,7 @@ pub const INITIALIZED_METHOD: &str = "initialized";
pub const EXEC_METHOD: &str = "process/start";
pub const EXEC_READ_METHOD: &str = "process/read";
pub const EXEC_WRITE_METHOD: &str = "process/write";
pub const EXEC_SIGNAL_METHOD: &str = "process/signal";
pub const EXEC_TERMINATE_METHOD: &str = "process/terminate";
pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output";
pub const EXEC_EXITED_METHOD: &str = "process/exited";
@@ -166,6 +167,23 @@ pub struct WriteResponse {
pub status: WriteStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ProcessSignal {
Interrupt,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SignalParams {
pub process_id: ProcessId,
pub signal: ProcessSignal,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SignalResponse {}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TerminateParams {
@@ -12,6 +12,7 @@ use crate::StartedExecProcess;
use crate::client::LazyRemoteExecServerClient;
use crate::client::Session;
use crate::protocol::ExecParams;
use crate::protocol::ProcessSignal;
use crate::protocol::ReadResponse;
use crate::protocol::WriteResponse;
@@ -76,6 +77,11 @@ impl ExecProcess for RemoteExecProcess {
self.session.write(chunk).await
}
async fn signal(&self, signal: ProcessSignal) -> Result<(), ExecServerError> {
trace!("exec process signal");
self.session.signal(signal).await
}
async fn terminate(&self) -> Result<(), ExecServerError> {
trace!("exec process terminate");
self.session.terminate().await
@@ -42,6 +42,8 @@ use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::SignalParams;
use crate::protocol::SignalResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -171,6 +173,14 @@ impl ExecServerHandler {
session.process().exec_write(params).await
}
pub(crate) async fn signal(
&self,
params: SignalParams,
) -> Result<SignalResponse, JSONRPCErrorError> {
let session = self.require_initialized_for("exec")?;
session.process().signal(params).await
}
pub(crate) async fn terminate(
&self,
params: TerminateParams,
@@ -5,6 +5,8 @@ use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::SignalParams;
use crate::protocol::SignalResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
@@ -49,6 +51,13 @@ impl ProcessHandler {
self.process.exec_write(params).await
}
pub(crate) async fn signal(
&self,
params: SignalParams,
) -> Result<SignalResponse, JSONRPCErrorError> {
self.process.signal_process(params).await
}
pub(crate) async fn terminate(
&self,
params: TerminateParams,
@@ -3,6 +3,7 @@ use std::sync::Arc;
use crate::protocol::ENVIRONMENT_INFO_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_SIGNAL_METHOD;
use crate::protocol::EXEC_TERMINATE_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
use crate::protocol::ExecParams;
@@ -32,6 +33,7 @@ use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::ReadParams;
use crate::protocol::SignalParams;
use crate::protocol::TerminateParams;
use crate::protocol::WriteParams;
use crate::rpc::RpcRouter;
@@ -77,6 +79,12 @@ pub(crate) fn build_router() -> RpcRouter<ExecServerHandler> {
handler.exec_write(params).await
},
);
router.request(
EXEC_SIGNAL_METHOD,
|handler: Arc<ExecServerHandler>, params: SignalParams| async move {
handler.signal(params).await
},
);
router.request(
EXEC_TERMINATE_METHOD,
|handler: Arc<ExecServerHandler>, params: TerminateParams| async move {
+123 -2
View File
@@ -1,5 +1,3 @@
#![cfg(unix)]
mod common;
use std::sync::Arc;
@@ -13,6 +11,7 @@ use codex_exec_server::ExecParams;
use codex_exec_server::ExecProcess;
use codex_exec_server::ExecProcessEvent;
use codex_exec_server::ProcessId;
use codex_exec_server::ProcessSignal;
use codex_exec_server::ReadResponse;
use codex_exec_server::StartedExecProcess;
use codex_exec_server::WriteStatus;
@@ -505,6 +504,98 @@ async fn assert_exec_process_rejects_write_without_pipe_stdin(use_remote: bool)
Ok(())
}
async fn assert_exec_process_signal_interrupts_process(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let process_id = "proc-signal".to_string();
let session = context
.backend
.start(ExecParams {
process_id: process_id.clone().into(),
argv: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"trap 'printf \"signal:2\\n\"; exit 7' INT; printf 'ready\\n'; while :; do :; done".to_string(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
assert_eq!(session.process.process_id().as_str(), process_id);
let StartedExecProcess { process } = session;
let mut wake_rx = process.subscribe_wake();
let mut ready_output = String::new();
let mut after_seq = None;
loop {
let response =
read_process_until_change(Arc::clone(&process), &mut wake_rx, after_seq).await?;
for chunk in response.chunks {
ready_output.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner()));
after_seq = Some(chunk.seq);
}
if ready_output.contains("ready\n") {
break;
}
if response.closed {
anyhow::bail!("process closed before readiness marker: {ready_output:?}");
}
after_seq = response.next_seq.checked_sub(1).or(after_seq);
}
process.signal(ProcessSignal::Interrupt).await?;
let (output, exit_code, closed) = collect_process_output_from_reads(process, wake_rx).await?;
assert!(
output.contains("signal:2"),
"expected signal handler output, got {output:?}"
);
assert_eq!(exit_code, Some(7));
assert!(closed);
Ok(())
}
async fn assert_exec_process_signal_reports_unsupported_on_windows(use_remote: bool) -> Result<()> {
let context = create_process_context(use_remote).await?;
let session = context
.backend
.start(ExecParams {
process_id: ProcessId::from("proc-windows-signal"),
argv: vec![
"cmd".to_string(),
"/C".to_string(),
"echo ready && ping -n 30 127.0.0.1 >NUL".to_string(),
],
cwd: std::env::current_dir()?,
env_policy: /*env_policy*/ None,
env: Default::default(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await?;
let err = match session.process.signal(ProcessSignal::Interrupt).await {
Ok(()) => anyhow::bail!("Windows non-TTY signal should report unsupported"),
Err(err) => err,
};
let message = err.to_string();
assert!(
message.contains("failed to signal process"),
"unexpected signal error: {message}"
);
assert!(
message.contains("process interrupt is not supported by this process backend"),
"unexpected signal error: {message}"
);
session.process.terminate().await?;
Ok(())
}
async fn assert_exec_process_preserves_queued_events_before_subscribe(
use_remote: bool,
) -> Result<()> {
@@ -539,6 +630,7 @@ async fn assert_exec_process_preserves_queued_events_before_subscribe(
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn remote_exec_process_reports_transport_disconnect() -> Result<()> {
@@ -630,6 +722,7 @@ async fn remote_exec_process_reports_transport_disconnect() -> Result<()> {
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
@@ -639,6 +732,7 @@ async fn exec_process_starts_and_exits(use_remote: bool) -> Result<()> {
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
@@ -648,6 +742,7 @@ async fn exec_process_streams_output(use_remote: bool) -> Result<()> {
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
@@ -657,6 +752,7 @@ async fn exec_process_pushes_events(use_remote: bool) -> Result<()> {
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
@@ -666,6 +762,7 @@ async fn exec_process_replays_events_after_close(use_remote: bool) -> Result<()>
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
@@ -677,6 +774,7 @@ async fn exec_process_retains_output_after_exit_until_streams_close(
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
@@ -686,6 +784,7 @@ async fn exec_process_write_then_read(use_remote: bool) -> Result<()> {
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
@@ -695,6 +794,7 @@ async fn exec_process_write_then_read_without_tty(use_remote: bool) -> Result<()
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
@@ -704,6 +804,27 @@ async fn exec_process_rejects_write_without_pipe_stdin(use_remote: bool) -> Resu
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_signal_interrupts_process(use_remote: bool) -> Result<()> {
assert_exec_process_signal_interrupts_process(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(windows), ignore = "Windows-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
async fn exec_process_signal_reports_unsupported_on_windows(use_remote: bool) -> Result<()> {
assert_exec_process_signal_reports_unsupported_on_windows(use_remote).await
}
#[test_case(false ; "local")]
#[test_case(true ; "remote")]
#[cfg_attr(not(unix), ignore = "Unix-only exec-server process test")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch a real exec-server process through the full CLI.
#[serial_test::serial(remote_exec_server)]
+2
View File
@@ -17,6 +17,8 @@ pub use pipe::spawn_process_no_stdin as spawn_pipe_process_no_stdin;
pub use process::ProcessDriver;
/// Handle for interacting with a spawned process (PTY or pipe).
pub use process::ProcessHandle;
/// Process signal supported by spawned-process handles.
pub use process::ProcessSignal;
/// Bundle of process handles plus split output and exit receivers returned by spawn helpers.
pub use process::SpawnedProcess;
/// Terminal size in character cells used for PTY spawn and resize operations.
+19 -1
View File
@@ -19,7 +19,9 @@ use tokio::task::JoinHandle;
use crate::process::ChildTerminator;
use crate::process::ProcessHandle;
use crate::process::ProcessSignal;
use crate::process::SpawnedProcess;
use crate::process::exit_code_from_status;
#[cfg(target_os = "linux")]
use libc;
@@ -32,6 +34,22 @@ struct PipeChildTerminator {
}
impl ChildTerminator for PipeChildTerminator {
fn signal(&mut self, signal: ProcessSignal) -> io::Result<()> {
match signal {
ProcessSignal::Interrupt => {
#[cfg(unix)]
{
crate::process_group::interrupt_process_group(self.process_group_id)
}
#[cfg(not(unix))]
{
Err(crate::process::unsupported_signal(signal))
}
}
}
}
fn kill(&mut self) -> io::Result<()> {
#[cfg(unix)]
{
@@ -209,7 +227,7 @@ async fn spawn_process_with_stdin_mode(
let wait_exit_code = Arc::clone(&exit_code);
let wait_handle: JoinHandle<()> = tokio::spawn(async move {
let code = match child.wait().await {
Ok(status) => status.code().unwrap_or(-1),
Ok(status) => exit_code_from_status(status),
Err(_) => -1,
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
+48
View File
@@ -2,6 +2,7 @@ use core::fmt;
use std::io;
#[cfg(unix)]
use std::os::fd::RawFd;
use std::process::ExitStatus;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
@@ -17,7 +18,39 @@ use tokio::sync::watch;
use tokio::task::AbortHandle;
use tokio::task::JoinHandle;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ProcessSignal {
Interrupt,
}
pub(crate) fn unsupported_signal(signal: ProcessSignal) -> io::Error {
match signal {
ProcessSignal::Interrupt => io::Error::new(
io::ErrorKind::Unsupported,
"process interrupt is not supported by this process backend",
),
}
}
pub(crate) fn exit_code_from_status(status: ExitStatus) -> i32 {
if let Some(code) = status.code() {
return code;
}
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(signal) = status.signal() {
return 128 + signal;
}
}
-1
}
pub(crate) trait ChildTerminator: Send + Sync {
fn signal(&mut self, signal: ProcessSignal) -> io::Result<()>;
fn kill(&mut self) -> io::Result<()>;
}
@@ -193,6 +226,17 @@ impl ProcessHandle {
}
}
pub fn signal(&self, signal: ProcessSignal) -> io::Result<()> {
let Ok(mut killer_opt) = self.killer.lock() else {
return Ok(());
};
let Some(killer) = killer_opt.as_mut() else {
return Ok(());
};
killer.signal(signal)
}
/// Attempts to kill the child and abort helper tasks.
pub fn terminate(&self) {
self.request_terminate();
@@ -232,6 +276,10 @@ struct ClosureTerminator {
}
impl ChildTerminator for ClosureTerminator {
fn signal(&mut self, signal: ProcessSignal) -> io::Result<()> {
Err(unsupported_signal(signal))
}
fn kill(&mut self) -> io::Result<()> {
if let Some(inner) = self.inner.as_mut() {
(inner)();
+24 -19
View File
@@ -118,15 +118,10 @@ pub fn kill_process_group_by_pid(_pid: u32) -> io::Result<()> {
}
#[cfg(unix)]
/// Send SIGTERM to a specific process group ID (best-effort).
///
/// Returns `Ok(true)` when SIGTERM was delivered to an existing group and
/// `Ok(false)` when the group no longer exists.
pub fn terminate_process_group(process_group_id: u32) -> io::Result<bool> {
fn signal_process_group_id(pgid: libc::pid_t, signal: libc::c_int) -> io::Result<bool> {
use std::io::ErrorKind;
let pgid = process_group_id as libc::pid_t;
let result = unsafe { libc::killpg(pgid, libc::SIGTERM) };
let result = unsafe { libc::killpg(pgid, signal) };
if result == -1 {
let err = io::Error::last_os_error();
if err.kind() == ErrorKind::NotFound || err.raw_os_error() == Some(libc::ESRCH) {
@@ -138,27 +133,37 @@ pub fn terminate_process_group(process_group_id: u32) -> io::Result<bool> {
Ok(true)
}
#[cfg(unix)]
/// Send SIGTERM to a specific process group ID (best-effort).
///
/// Returns `Ok(true)` when SIGTERM was delivered to an existing group and
/// `Ok(false)` when the group no longer exists.
pub fn terminate_process_group(process_group_id: u32) -> io::Result<bool> {
signal_process_group_id(process_group_id as libc::pid_t, libc::SIGTERM)
}
#[cfg(not(unix))]
/// No-op on non-Unix platforms.
pub fn terminate_process_group(_process_group_id: u32) -> io::Result<bool> {
Ok(false)
}
#[cfg(unix)]
/// Send SIGINT to a specific process group ID (best-effort).
pub fn interrupt_process_group(process_group_id: u32) -> io::Result<()> {
signal_process_group_id(process_group_id as libc::pid_t, libc::SIGINT).map(|_| ())
}
#[cfg(not(unix))]
/// No-op on non-Unix platforms.
pub fn interrupt_process_group(_process_group_id: u32) -> io::Result<()> {
Ok(())
}
#[cfg(unix)]
/// Kill a specific process group ID (best-effort).
pub fn kill_process_group(process_group_id: u32) -> io::Result<()> {
use std::io::ErrorKind;
let pgid = process_group_id as libc::pid_t;
let result = unsafe { libc::killpg(pgid, libc::SIGKILL) };
if result == -1 {
let err = io::Error::last_os_error();
if err.kind() != ErrorKind::NotFound && err.raw_os_error() != Some(libc::ESRCH) {
return Err(err);
}
}
Ok(())
signal_process_group_id(process_group_id as libc::pid_t, libc::SIGKILL).map(|_| ())
}
#[cfg(not(unix))]
+25 -1
View File
@@ -30,10 +30,13 @@ use tokio::task::JoinHandle;
use crate::process::ChildTerminator;
use crate::process::ProcessHandle;
use crate::process::ProcessSignal;
use crate::process::PtyHandles;
use crate::process::PtyMasterHandle;
use crate::process::SpawnedProcess;
use crate::process::TerminalSize;
#[cfg(unix)]
use crate::process::exit_code_from_status;
/// Returns true when ConPTY support is available (Windows only).
#[cfg(windows)]
@@ -54,6 +57,19 @@ struct PtyChildTerminator {
}
impl ChildTerminator for PtyChildTerminator {
fn signal(&mut self, signal: ProcessSignal) -> std::io::Result<()> {
match signal {
ProcessSignal::Interrupt => {
#[cfg(unix)]
if let Some(process_group_id) = self.process_group_id {
return crate::process_group::interrupt_process_group(process_group_id);
}
Err(crate::process::unsupported_signal(signal))
}
}
}
fn kill(&mut self) -> std::io::Result<()> {
#[cfg(unix)]
if let Some(process_group_id) = self.process_group_id {
@@ -81,6 +97,14 @@ struct RawPidTerminator {
#[cfg(unix)]
impl ChildTerminator for RawPidTerminator {
fn signal(&mut self, signal: ProcessSignal) -> std::io::Result<()> {
match signal {
ProcessSignal::Interrupt => {
crate::process_group::interrupt_process_group(self.process_group_id)
}
}
}
fn kill(&mut self) -> std::io::Result<()> {
crate::process_group::kill_process_group(self.process_group_id)
}
@@ -368,7 +392,7 @@ async fn spawn_process_preserving_fds(
let wait_exit_code = Arc::clone(&exit_code);
let wait_handle: JoinHandle<()> = tokio::task::spawn_blocking(move || {
let code = match child.wait() {
Ok(status) => status.code().unwrap_or(-1),
Ok(status) => exit_code_from_status(status),
Err(_) => -1,
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);