mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
fix(exec-server): retain output until streams close (#18946)
## Why A Mac Bazel run hit a flake in `server::handler::tests::output_and_exit_are_retained_after_notification_receiver_closes` where the read path observed process exit but lost the expected buffered stdout (`first\nsecond\n`). See the [GitHub Actions job](https://github.com/openai/codex/actions/runs/24758468552/job/72436716505) and [BuildBuddy invocation](https://app.buildbuddy.io/invocation/37475a12-4ef2-45fb-ab8a-e49a2aba1d59). The underlying race is that process exit is not the same thing as stdout/stderr closure. If a child or grandchild inherits the pipe write end, or a process duplicates it with `dup2`, the watched process can exit while the stream is still open and more output can still arrive. The exec-server was starting exited-process retention cleanup from the exit event, so the process entry could be removed before the output streams had actually closed. While stress-testing the exec-server unit suite, `server::handler::tests::long_poll_read_fails_after_session_resume` exposed a separate test race: it started a short-lived command that could exit and wake the pending long-poll read before the session-resume assertion observed the resumed-session error. That test is intended to cover resume eviction, not process-exit delivery, so this change keeps the process alive and quiet while the second connection resumes the session. ## What changed - Keep exec-server process entries retained until stdout/stderr streams close, then start the post-exit retention timer from the closed event. - Wake long-poll readers when the closed event is emitted. - Add focused `local_process` unit coverage that proves late output is still retained after the short test retention interval has elapsed, and that closed process entries are eventually evicted. - Add a local and remote regression test where a parent exits while a child keeps inherited stdout open. The child waits on an explicit release file, so the test deterministically observes exit first, releases the child, then requires a nonzero-wait read from the exit sequence to receive the late output. - In `codex-rs/exec-server/src/server/handler/tests.rs`, make `long_poll_read_fails_after_session_resume` run a long-lived silent command instead of a short command that prints and exits. This isolates the test to session-resume behavior and prevents a normal process exit from satisfying the pending long-poll read first. ## Testing - `cargo test -p codex-exec-server exec_process_retains_output_after_exit_until_streams_close` - `cargo test -p codex-exec-server local_process::tests` - `cargo test -p codex-exec-server` - `just fix -p codex-exec-server` - `bazel test //codex-rs/exec-server:exec-server-unit-tests //codex-rs/exec-server:exec-server-exec_process-test //codex-rs/exec-server:exec-server-file_system-test //codex-rs/exec-server:exec-server-http_client-test //codex-rs/exec-server:exec-server-initialize-test //codex-rs/exec-server:exec-server-process-test //codex-rs/exec-server:exec-server-websocket-test` - `bazel test --runs_per_test=25 //codex-rs/exec-server:exec-server-unit-tests` ## Documentation No docs update needed; this is an internal exec-server correctness fix.
This commit is contained in:
committed by
GitHub
Unverified
parent
9c0eced391
commit
491a3058f6
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -628,16 +629,7 @@ async fn watch_exit(
|
||||
.await;
|
||||
}
|
||||
|
||||
maybe_emit_closed(process_id.clone(), Arc::clone(&inner)).await;
|
||||
|
||||
tokio::time::sleep(EXITED_PROCESS_RETENTION).await;
|
||||
let mut processes = inner.processes.lock().await;
|
||||
if matches!(
|
||||
processes.get(&process_id),
|
||||
Some(ProcessEntry::Running(process)) if process.exit_code == Some(exit_code)
|
||||
) {
|
||||
processes.remove(&process_id);
|
||||
}
|
||||
maybe_emit_closed(process_id, Arc::clone(&inner)).await;
|
||||
}
|
||||
|
||||
async fn finish_output_stream(process_id: ProcessId, inner: Arc<Inner>) {
|
||||
@@ -656,7 +648,7 @@ async fn finish_output_stream(process_id: ProcessId, inner: Arc<Inner>) {
|
||||
}
|
||||
|
||||
async fn maybe_emit_closed(process_id: ProcessId, inner: Arc<Inner>) {
|
||||
let notification = {
|
||||
let (notification, output_notify) = {
|
||||
let mut processes = inner.processes.lock().await;
|
||||
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {
|
||||
return;
|
||||
@@ -671,15 +663,30 @@ async fn maybe_emit_closed(process_id: ProcessId, inner: Arc<Inner>) {
|
||||
process.next_seq += 1;
|
||||
let _ = process.wake_tx.send(seq);
|
||||
process.events.publish(ExecProcessEvent::Closed { seq });
|
||||
Some(ExecClosedNotification {
|
||||
process_id: process_id.clone(),
|
||||
seq,
|
||||
})
|
||||
(
|
||||
ExecClosedNotification {
|
||||
process_id: process_id.clone(),
|
||||
seq,
|
||||
},
|
||||
Arc::clone(&process.output_notify),
|
||||
)
|
||||
};
|
||||
|
||||
let Some(notification) = notification else {
|
||||
return;
|
||||
};
|
||||
output_notify.notify_waiters();
|
||||
let cleanup_process_id = process_id.clone();
|
||||
let cleanup_inner = Arc::clone(&inner);
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(EXITED_PROCESS_RETENTION).await;
|
||||
let mut processes = cleanup_inner.processes.lock().await;
|
||||
match processes.entry(cleanup_process_id) {
|
||||
Entry::Occupied(entry) => {
|
||||
if matches!(entry.get(), ProcessEntry::Running(process) if process.closed) {
|
||||
entry.remove();
|
||||
}
|
||||
}
|
||||
Entry::Vacant(_) => {}
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(notifications) = notification_sender(&inner) {
|
||||
let _ = notifications
|
||||
@@ -700,6 +707,10 @@ fn notification_sender(inner: &Inner) -> Option<RpcNotificationSender> {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_config::types::ShellEnvironmentPolicyInherit;
|
||||
use codex_utils_pty::ProcessDriver;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
|
||||
fn test_exec_params(env: HashMap<String, String>) -> ExecParams {
|
||||
ExecParams {
|
||||
@@ -748,4 +759,222 @@ mod tests {
|
||||
|
||||
assert_eq!(child_env(¶ms), expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn exited_process_retains_late_output_past_retention() {
|
||||
let backend = LocalProcess::default();
|
||||
let mut process = spawn_test_process(&backend, "proc-late-output").await;
|
||||
|
||||
process.exit(/*exit_code*/ 0);
|
||||
let exit_response =
|
||||
read_process_until_change(&backend, &process.process_id, /*after_seq*/ None).await;
|
||||
assert_eq!(
|
||||
exit_response,
|
||||
ReadResponse {
|
||||
chunks: Vec::new(),
|
||||
next_seq: 2,
|
||||
exited: true,
|
||||
exit_code: Some(0),
|
||||
closed: false,
|
||||
failure: None,
|
||||
}
|
||||
);
|
||||
|
||||
tokio::time::sleep(EXITED_PROCESS_RETENTION + Duration::from_millis(10)).await;
|
||||
process
|
||||
.stdout_tx
|
||||
.send(b"late output after retention\n".to_vec())
|
||||
.await
|
||||
.expect("send late stdout");
|
||||
|
||||
let late_response =
|
||||
read_process_until_change(&backend, &process.process_id, /*after_seq*/ Some(1)).await;
|
||||
assert_eq!(
|
||||
late_response.chunks,
|
||||
vec![ProcessOutputChunk {
|
||||
seq: 2,
|
||||
stream: ExecOutputStream::Stdout,
|
||||
chunk: b"late output after retention\n".to_vec().into(),
|
||||
}]
|
||||
);
|
||||
assert_eq!(late_response.exit_code, Some(0));
|
||||
assert!(!late_response.closed);
|
||||
|
||||
drop(process.stdout_tx);
|
||||
drop(process.stderr_tx);
|
||||
let _closed_response = timeout(
|
||||
Duration::from_secs(1),
|
||||
read_process_until_closed(&backend, &process.process_id),
|
||||
)
|
||||
.await
|
||||
.expect("process should close");
|
||||
backend.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn closed_process_is_evicted_after_retention() {
|
||||
let backend = LocalProcess::default();
|
||||
let mut process = spawn_test_process(&backend, "proc-closed-eviction").await;
|
||||
let process_id = process.process_id.clone();
|
||||
|
||||
process.exit(/*exit_code*/ 0);
|
||||
drop(process.stdout_tx);
|
||||
drop(process.stderr_tx);
|
||||
|
||||
let closed_response = timeout(
|
||||
Duration::from_secs(1),
|
||||
read_process_until_closed(&backend, &process_id),
|
||||
)
|
||||
.await
|
||||
.expect("process should close");
|
||||
assert!(closed_response.closed);
|
||||
|
||||
timeout(Duration::from_secs(1), async {
|
||||
loop {
|
||||
{
|
||||
let processes = backend.inner.processes.lock().await;
|
||||
if !processes.contains_key(&process_id) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(5)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("closed process should be evicted");
|
||||
backend.shutdown().await;
|
||||
}
|
||||
|
||||
struct TestProcess {
|
||||
process_id: ProcessId,
|
||||
stdout_tx: mpsc::Sender<Vec<u8>>,
|
||||
stderr_tx: mpsc::Sender<Vec<u8>>,
|
||||
exit_tx: Option<oneshot::Sender<i32>>,
|
||||
}
|
||||
|
||||
impl TestProcess {
|
||||
fn exit(&mut self, exit_code: i32) {
|
||||
self.exit_tx
|
||||
.take()
|
||||
.expect("process should not have exited")
|
||||
.send(exit_code)
|
||||
.expect("send process exit");
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn_test_process(backend: &LocalProcess, process_id: &str) -> TestProcess {
|
||||
let process_id = ProcessId::from(process_id);
|
||||
let (stdout_tx, stdout_rx) = mpsc::channel(16);
|
||||
let (stderr_tx, stderr_rx) = mpsc::channel(16);
|
||||
let (exit_tx, exit_rx) = oneshot::channel();
|
||||
let output_notify = Arc::new(Notify::new());
|
||||
let (wake_tx, _wake_rx) = watch::channel(0);
|
||||
let events = ExecProcessEventLog::new(
|
||||
PROCESS_EVENT_CHANNEL_CAPACITY,
|
||||
RETAINED_OUTPUT_BYTES_PER_PROCESS,
|
||||
);
|
||||
|
||||
let mut processes = backend.inner.processes.lock().await;
|
||||
let previous = processes.insert(
|
||||
process_id.clone(),
|
||||
ProcessEntry::Running(Box::new(RunningProcess {
|
||||
session: dummy_session(),
|
||||
tty: false,
|
||||
pipe_stdin: false,
|
||||
output: VecDeque::new(),
|
||||
retained_bytes: 0,
|
||||
next_seq: 1,
|
||||
exit_code: None,
|
||||
wake_tx: wake_tx.clone(),
|
||||
events: events.clone(),
|
||||
output_notify: Arc::clone(&output_notify),
|
||||
open_streams: 2,
|
||||
closed: false,
|
||||
})),
|
||||
);
|
||||
assert!(previous.is_none());
|
||||
drop(processes);
|
||||
|
||||
tokio::spawn(stream_output(
|
||||
process_id.clone(),
|
||||
ExecOutputStream::Stdout,
|
||||
stdout_rx,
|
||||
Arc::clone(&backend.inner),
|
||||
Arc::clone(&output_notify),
|
||||
));
|
||||
tokio::spawn(stream_output(
|
||||
process_id.clone(),
|
||||
ExecOutputStream::Stderr,
|
||||
stderr_rx,
|
||||
Arc::clone(&backend.inner),
|
||||
Arc::clone(&output_notify),
|
||||
));
|
||||
tokio::spawn(watch_exit(
|
||||
process_id.clone(),
|
||||
exit_rx,
|
||||
Arc::clone(&backend.inner),
|
||||
output_notify,
|
||||
));
|
||||
|
||||
TestProcess {
|
||||
process_id,
|
||||
stdout_tx,
|
||||
stderr_tx,
|
||||
exit_tx: Some(exit_tx),
|
||||
}
|
||||
}
|
||||
|
||||
fn dummy_session() -> ExecCommandSession {
|
||||
let (writer_tx, _writer_rx) = mpsc::channel(1);
|
||||
let (_stdout_tx, stdout_rx) = tokio::sync::broadcast::channel(1);
|
||||
let (_stderr_tx, stderr_rx) = tokio::sync::broadcast::channel(1);
|
||||
let (_exit_tx, exit_rx) = oneshot::channel();
|
||||
|
||||
codex_utils_pty::spawn_from_driver(ProcessDriver {
|
||||
writer_tx,
|
||||
stdout_rx,
|
||||
stderr_rx: Some(stderr_rx),
|
||||
exit_rx,
|
||||
terminator: None,
|
||||
writer_handle: None,
|
||||
resizer: None,
|
||||
})
|
||||
.session
|
||||
}
|
||||
|
||||
async fn read_process_until_change(
|
||||
backend: &LocalProcess,
|
||||
process_id: &ProcessId,
|
||||
after_seq: Option<u64>,
|
||||
) -> ReadResponse {
|
||||
timeout(
|
||||
Duration::from_secs(1),
|
||||
backend.exec_read(ReadParams {
|
||||
process_id: process_id.clone(),
|
||||
after_seq,
|
||||
max_bytes: None,
|
||||
wait_ms: Some(1_000),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("process read should finish")
|
||||
.expect("process read")
|
||||
}
|
||||
|
||||
async fn read_process_until_closed(
|
||||
backend: &LocalProcess,
|
||||
process_id: &ProcessId,
|
||||
) -> ReadResponse {
|
||||
let mut after_seq = None;
|
||||
loop {
|
||||
let response = read_process_until_change(backend, process_id, after_seq).await;
|
||||
if response.closed {
|
||||
return response;
|
||||
}
|
||||
for chunk in &response.chunks {
|
||||
after_seq = Some(chunk.seq);
|
||||
}
|
||||
after_seq = response.next_seq.checked_sub(1).or(after_seq);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,13 +170,12 @@ async fn long_poll_read_fails_after_session_resume() {
|
||||
.expect("initialize");
|
||||
first_handler.initialized().expect("initialized");
|
||||
|
||||
// Keep the process quiet and alive so the pending read can only complete
|
||||
// after session resume, not because the process produced output or exited.
|
||||
first_handler
|
||||
.exec(exec_params_with_argv(
|
||||
"proc-long-poll",
|
||||
shell_argv(
|
||||
"sleep 0.1; printf resumed",
|
||||
"ping -n 2 127.0.0.1 >NUL && echo resumed",
|
||||
),
|
||||
shell_argv("sleep 5", "ping -n 6 127.0.0.1 >NUL"),
|
||||
))
|
||||
.await
|
||||
.expect("start process");
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
use std::env;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_exec_server::CODEX_FS_HELPER_ARG1;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
@@ -11,6 +16,11 @@ use ctor::ctor;
|
||||
|
||||
pub(crate) mod exec_server;
|
||||
|
||||
pub(crate) const DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG: &str =
|
||||
"--codex-test-delayed-output-after-exit-parent";
|
||||
|
||||
const DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG: &str = "--codex-test-delayed-output-after-exit-child";
|
||||
|
||||
#[ctor]
|
||||
pub static TEST_BINARY_DISPATCH_GUARD: Option<TestBinaryDispatchGuard> = {
|
||||
let guard = configure_test_binary_dispatch("codex-exec-server-tests", |exe_name, argv1| {
|
||||
@@ -22,6 +32,7 @@ pub static TEST_BINARY_DISPATCH_GUARD: Option<TestBinaryDispatchGuard> = {
|
||||
}
|
||||
TestBinaryDispatchMode::InstallAliases
|
||||
});
|
||||
maybe_run_delayed_output_after_exit_from_test_binary();
|
||||
maybe_run_exec_server_from_test_binary(guard.as_ref());
|
||||
guard
|
||||
};
|
||||
@@ -39,6 +50,82 @@ pub(crate) fn current_test_binary_helper_paths() -> anyhow::Result<(PathBuf, Opt
|
||||
Ok((current_exe, codex_linux_sandbox_exe))
|
||||
}
|
||||
|
||||
fn maybe_run_delayed_output_after_exit_from_test_binary() {
|
||||
let mut args = env::args();
|
||||
let _program = args.next();
|
||||
let Some(command) = args.next() else {
|
||||
return;
|
||||
};
|
||||
match command.as_str() {
|
||||
DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG => {
|
||||
let release_path = next_release_path_arg(args);
|
||||
run_delayed_output_after_exit_parent(&release_path);
|
||||
}
|
||||
DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG => {
|
||||
let release_path = next_release_path_arg(args);
|
||||
run_delayed_output_after_exit_child(&release_path);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn next_release_path_arg(mut args: impl Iterator<Item = String>) -> PathBuf {
|
||||
let Some(release_path) = args.next() else {
|
||||
eprintln!("expected release path");
|
||||
std::process::exit(1);
|
||||
};
|
||||
if args.next().is_some() {
|
||||
eprintln!("unexpected extra arguments");
|
||||
std::process::exit(1);
|
||||
}
|
||||
PathBuf::from(release_path)
|
||||
}
|
||||
|
||||
fn run_delayed_output_after_exit_parent(release_path: &Path) {
|
||||
let current_exe = match env::current_exe() {
|
||||
Ok(current_exe) => current_exe,
|
||||
Err(error) => {
|
||||
eprintln!("failed to resolve current test binary: {error}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
match Command::new(current_exe)
|
||||
.arg(DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG)
|
||||
.arg(release_path)
|
||||
.stdin(Stdio::null())
|
||||
.spawn()
|
||||
{
|
||||
Ok(_) => std::process::exit(0),
|
||||
Err(error) => {
|
||||
eprintln!("failed to spawn delayed output child: {error}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn run_delayed_output_after_exit_child(release_path: &Path) {
|
||||
for _ in 0..1_000 {
|
||||
if release_path.exists() {
|
||||
let mut stdout = std::io::stdout().lock();
|
||||
if let Err(error) = writeln!(stdout, "late output after exit") {
|
||||
eprintln!("failed to write delayed output: {error}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
if let Err(error) = stdout.flush() {
|
||||
eprintln!("failed to flush delayed output: {error}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
std::process::exit(0);
|
||||
}
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
eprintln!(
|
||||
"timed out waiting for release path {}",
|
||||
release_path.display()
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
fn maybe_run_exec_server_from_test_binary(guard: Option<&TestBinaryDispatchGuard>) {
|
||||
let mut args = env::args();
|
||||
let _program = args.next();
|
||||
|
||||
@@ -17,11 +17,14 @@ use codex_exec_server::ReadResponse;
|
||||
use codex_exec_server::StartedExecProcess;
|
||||
use codex_exec_server::WriteStatus;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use test_case::test_case;
|
||||
use tokio::sync::watch;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use common::DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG;
|
||||
use common::current_test_binary_helper_paths;
|
||||
use common::exec_server::ExecServerHarness;
|
||||
use common::exec_server::exec_server;
|
||||
|
||||
@@ -320,6 +323,81 @@ async fn assert_exec_process_replays_events_after_close(use_remote: bool) -> Res
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_exec_process_retains_output_after_exit_until_streams_close(
|
||||
use_remote: bool,
|
||||
) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let (helper_binary, _) = current_test_binary_helper_paths()?;
|
||||
let release_dir = TempDir::new()?;
|
||||
let release_path = release_dir.path().join("release-delayed-output");
|
||||
let process_id = "proc-output-after-exit".to_string();
|
||||
let session = context
|
||||
.backend
|
||||
.start(ExecParams {
|
||||
process_id: process_id.clone().into(),
|
||||
argv: vec![
|
||||
helper_binary.to_string_lossy().into_owned(),
|
||||
DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG.to_string(),
|
||||
release_path.to_string_lossy().into_owned(),
|
||||
],
|
||||
cwd: std::env::current_dir()?,
|
||||
env_policy: /*env_policy*/ None,
|
||||
env: Default::default(),
|
||||
tty: false,
|
||||
pipe_stdin: false,
|
||||
arg0: None,
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(session.process.process_id().as_str(), process_id);
|
||||
|
||||
let StartedExecProcess { process } = session;
|
||||
|
||||
let exit_response = timeout(
|
||||
Duration::from_secs(2),
|
||||
process.read(
|
||||
/*after_seq*/ None,
|
||||
/*max_bytes*/ None,
|
||||
/*wait_ms*/ Some(2_000),
|
||||
),
|
||||
)
|
||||
.await??;
|
||||
assert!(
|
||||
exit_response.chunks.is_empty(),
|
||||
"parent should exit before child writes delayed output"
|
||||
);
|
||||
assert_eq!(exit_response.exit_code, Some(0));
|
||||
assert!(!exit_response.closed);
|
||||
let exit_seq = exit_response
|
||||
.next_seq
|
||||
.checked_sub(1)
|
||||
.context("exit response should advance next_seq")?;
|
||||
std::fs::write(&release_path, b"go")?;
|
||||
|
||||
let late_response = timeout(
|
||||
Duration::from_secs(2),
|
||||
process.read(
|
||||
/*after_seq*/ Some(exit_seq),
|
||||
/*max_bytes*/ None,
|
||||
/*wait_ms*/ Some(2_000),
|
||||
),
|
||||
)
|
||||
.await??;
|
||||
let mut late_output = String::new();
|
||||
for chunk in late_response.chunks {
|
||||
assert_eq!(chunk.stream, ExecOutputStream::Stdout);
|
||||
late_output.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner()));
|
||||
}
|
||||
assert_eq!(late_output, "late output after exit\n");
|
||||
|
||||
let wake_rx = process.subscribe_wake();
|
||||
let actual = collect_process_output_from_reads(process, wake_rx).await?;
|
||||
assert_eq!(
|
||||
actual,
|
||||
("late output after exit\n".to_string(), Some(0), true)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> {
|
||||
let context = create_process_context(use_remote).await?;
|
||||
let process_id = "proc-stdin".to_string();
|
||||
@@ -586,6 +664,17 @@ async fn exec_process_replays_events_after_close(use_remote: bool) -> Result<()>
|
||||
assert_exec_process_replays_events_after_close(use_remote).await
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
// Serialize tests that launch a real exec-server process through the full CLI.
|
||||
#[serial_test::serial(remote_exec_server)]
|
||||
async fn exec_process_retains_output_after_exit_until_streams_close(
|
||||
use_remote: bool,
|
||||
) -> Result<()> {
|
||||
assert_exec_process_retains_output_after_exit_until_streams_close(use_remote).await
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
|
||||
Reference in New Issue
Block a user