diff --git a/codex-rs/mcp-server/tests/suite/codex_tool.rs b/codex-rs/mcp-server/tests/suite/codex_tool.rs index edf2f1b02..c898b8f0d 100644 --- a/codex-rs/mcp-server/tests/suite/codex_tool.rs +++ b/codex-rs/mcp-server/tests/suite/codex_tool.rs @@ -505,6 +505,9 @@ base_url = "{server_uri}/v1" wire_api = "responses" request_max_retries = 0 stream_max_retries = 0 + +[features] +remote_models = false "# ), ) diff --git a/codex-rs/utils/pty/src/tests.rs b/codex-rs/utils/pty/src/tests.rs index 89e72eeb4..efbaaf6b8 100644 --- a/codex-rs/utils/pty/src/tests.rs +++ b/codex-rs/utils/pty/src/tests.rs @@ -93,6 +93,57 @@ async fn collect_output_until_exit( } } +async fn wait_for_python_repl_ready( + writer: &tokio::sync::mpsc::Sender>, + output_rx: &mut tokio::sync::broadcast::Receiver>, + timeout_ms: u64, + newline: &str, +) -> anyhow::Result> { + let mut collected = Vec::new(); + let marker = "__codex_pty_ready__"; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms); + let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 }); + + while tokio::time::Instant::now() < deadline { + writer + .send(format!("print('{marker}'){newline}").into_bytes()) + .await?; + + let probe_deadline = tokio::time::Instant::now() + probe_window; + loop { + let now = tokio::time::Instant::now(); + if now >= deadline || now >= probe_deadline { + break; + } + let remaining = std::cmp::min( + deadline.saturating_duration_since(now), + probe_deadline.saturating_duration_since(now), + ); + match tokio::time::timeout(remaining, output_rx.recv()).await { + Ok(Ok(chunk)) => { + collected.extend_from_slice(&chunk); + if String::from_utf8_lossy(&collected).contains(marker) { + return Ok(collected); + } + } + Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue, + Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => { + anyhow::bail!( + "PTY output closed while waiting for Python REPL readiness: {:?}", + String::from_utf8_lossy(&collected) + ); + } + Err(_) => break, + } + } + } + + anyhow::bail!( + "timed out waiting for Python REPL readiness in PTY: {:?}", + String::from_utf8_lossy(&collected) + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> { let Some(python) = find_python() else { @@ -103,15 +154,20 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> { let env_map: HashMap = std::env::vars().collect(); let spawned = spawn_pty_process(&python, &[], Path::new("."), &env_map, &None).await?; let writer = spawned.session.writer_sender(); + let mut output_rx = spawned.output_rx; let newline = if cfg!(windows) { "\r\n" } else { "\n" }; + let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 }; + let mut output = + wait_for_python_repl_ready(&writer, &mut output_rx, startup_timeout_ms, newline).await?; writer .send(format!("print('hello from pty'){newline}").into_bytes()) .await?; writer.send(format!("exit(){newline}").into_bytes()).await?; let timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 }; - let (output, code) = - collect_output_until_exit(spawned.output_rx, spawned.exit_rx, timeout_ms).await; + let (remaining_output, code) = + collect_output_until_exit(output_rx, spawned.exit_rx, timeout_ms).await; + output.extend_from_slice(&remaining_output); let text = String::from_utf8_lossy(&output); assert!(