#![allow(dead_code)] use std::path::PathBuf; use std::process::Stdio; use std::time::Duration; use anyhow::anyhow; use codex_exec_server_protocol::JSONRPCMessage; use codex_exec_server_protocol::JSONRPCNotification; use codex_exec_server_protocol::JSONRPCRequest; use codex_exec_server_protocol::RequestId; use futures::SinkExt; use futures::StreamExt; use tempfile::TempDir; use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; use tokio::io::copy_bidirectional; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::process::Child; use tokio::process::Command; use tokio::sync::oneshot; use tokio::task::JoinHandle; use tokio::time::Instant; use tokio::time::sleep; use tokio::time::timeout; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); const CONNECT_RETRY_INTERVAL: Duration = Duration::from_millis(25); const EVENT_TIMEOUT: Duration = Duration::from_secs(5); pub(crate) struct ExecServerHarness { _codex_home: TempDir, _helper_paths: TestCodexHelperPaths, child: Child, websocket_url: String, websocket: tokio_tungstenite::WebSocketStream< tokio_tungstenite::MaybeTlsStream, >, next_request_id: i64, } impl Drop for ExecServerHarness { fn drop(&mut self) { let _ = self.child.start_kill(); } } pub(crate) struct TestCodexHelperPaths { pub(crate) codex_exe: PathBuf, pub(crate) codex_linux_sandbox_exe: Option, } pub(crate) struct DisconnectableWebSocketProxy { websocket_url: String, pause_tx: Option>, blocked_connection_rx: Option>, resume_tx: Option>, task: JoinHandle<()>, } impl Drop for DisconnectableWebSocketProxy { fn drop(&mut self) { self.task.abort(); } } pub(crate) fn test_codex_helper_paths() -> anyhow::Result { let (helper_binary, codex_linux_sandbox_exe) = super::current_test_binary_helper_paths()?; Ok(TestCodexHelperPaths { codex_exe: helper_binary, codex_linux_sandbox_exe, }) } pub(crate) async fn exec_server() -> anyhow::Result { exec_server_with_env(std::iter::empty::<(&str, &str)>()).await } pub(crate) async fn exec_server_with_env(env: I) -> anyhow::Result where I: IntoIterator, K: AsRef, V: AsRef, { let helper_paths = test_codex_helper_paths()?; let codex_home = TempDir::new()?; let mut child = Command::new(&helper_paths.codex_exe); child.args(["exec-server", "--listen", "ws://127.0.0.1:0"]); child.stdin(Stdio::null()); child.stdout(Stdio::piped()); child.stderr(Stdio::inherit()); child.kill_on_drop(true); child.env("CODEX_HOME", codex_home.path()); child.envs(env); let mut child = child.spawn()?; let websocket_url = read_listen_url_from_stdout(&mut child).await?; let (websocket, _) = connect_websocket_when_ready(&websocket_url).await?; Ok(ExecServerHarness { _codex_home: codex_home, _helper_paths: helper_paths, child, websocket_url, websocket, next_request_id: 1, }) } impl ExecServerHarness { pub(crate) fn websocket_url(&self) -> &str { &self.websocket_url } pub(crate) async fn disconnect_websocket(&mut self) -> anyhow::Result<()> { self.websocket.close(None).await?; Ok(()) } pub(crate) async fn reconnect_websocket(&mut self) -> anyhow::Result<()> { let (websocket, _) = connect_websocket_when_ready(&self.websocket_url).await?; self.websocket = websocket; Ok(()) } pub(crate) async fn disconnectable_websocket_proxy( &self, ) -> anyhow::Result { let upstream = self .websocket_url .strip_prefix("ws://") .ok_or_else(|| anyhow!("exec-server websocket URL must use ws://"))? .to_string(); let listener = TcpListener::bind("127.0.0.1:0").await?; let websocket_url = format!("ws://{}", listener.local_addr()?); let (pause_tx, pause_rx) = oneshot::channel(); let (blocked_connection_tx, blocked_connection_rx) = oneshot::channel(); let (resume_tx, resume_rx) = oneshot::channel(); let task = tokio::spawn(run_disconnectable_proxy( listener, upstream, pause_rx, blocked_connection_tx, resume_rx, )); Ok(DisconnectableWebSocketProxy { websocket_url, pause_tx: Some(pause_tx), blocked_connection_rx: Some(blocked_connection_rx), resume_tx: Some(resume_tx), task, }) } pub(crate) async fn send_request( &mut self, method: &str, params: serde_json::Value, ) -> anyhow::Result { let id = RequestId::Integer(self.next_request_id); self.next_request_id += 1; self.send_message(JSONRPCMessage::Request(JSONRPCRequest { id: id.clone(), method: method.to_string(), params: Some(params), trace: None, })) .await?; Ok(id) } pub(crate) async fn send_notification( &mut self, method: &str, params: serde_json::Value, ) -> anyhow::Result<()> { self.send_message(JSONRPCMessage::Notification(JSONRPCNotification { method: method.to_string(), params: Some(params), })) .await } pub(crate) async fn send_raw_text(&mut self, text: &str) -> anyhow::Result<()> { self.websocket .send(Message::Text(text.to_string().into())) .await?; Ok(()) } pub(crate) async fn send_raw_binary(&mut self, bytes: Vec) -> anyhow::Result<()> { self.websocket.send(Message::Binary(bytes.into())).await?; Ok(()) } pub(crate) async fn next_event(&mut self) -> anyhow::Result { self.next_event_with_timeout(EVENT_TIMEOUT).await } pub(crate) async fn wait_for_event( &mut self, mut predicate: F, ) -> anyhow::Result where F: FnMut(&JSONRPCMessage) -> bool, { let deadline = Instant::now() + EVENT_TIMEOUT; loop { let now = Instant::now(); if now >= deadline { return Err(anyhow!( "timed out waiting for matching exec-server event after {EVENT_TIMEOUT:?}" )); } let remaining = deadline.duration_since(now); let event = self.next_event_with_timeout(remaining).await?; if predicate(&event) { return Ok(event); } } } pub(crate) async fn shutdown(&mut self) -> anyhow::Result<()> { self.child.start_kill()?; timeout(CONNECT_TIMEOUT, self.child.wait()) .await .map_err(|_| anyhow!("timed out waiting for exec-server shutdown"))??; Ok(()) } async fn send_message(&mut self, message: JSONRPCMessage) -> anyhow::Result<()> { let encoded = serde_json::to_string(&message)?; self.websocket.send(Message::Text(encoded.into())).await?; Ok(()) } async fn next_event_with_timeout( &mut self, timeout_duration: Duration, ) -> anyhow::Result { loop { let frame = timeout(timeout_duration, self.websocket.next()) .await .map_err(|_| anyhow!("timed out waiting for exec-server websocket event"))? .ok_or_else(|| anyhow!("exec-server websocket closed"))??; match frame { Message::Text(text) => { return Ok(serde_json::from_str(text.as_ref())?); } Message::Binary(bytes) => { return Ok(serde_json::from_slice(bytes.as_ref())?); } Message::Close(_) => return Err(anyhow!("exec-server websocket closed")), Message::Ping(_) | Message::Pong(_) => {} _ => {} } } } } impl DisconnectableWebSocketProxy { pub(crate) fn websocket_url(&self) -> &str { &self.websocket_url } pub(crate) async fn pause_and_disconnect(&mut self) -> anyhow::Result<()> { self.pause_tx .take() .ok_or_else(|| anyhow!("disconnectable websocket proxy is already paused"))? .send(()) .map_err(|_| anyhow!("disconnectable websocket proxy stopped"))?; let blocked_connection_rx = self .blocked_connection_rx .take() .ok_or_else(|| anyhow!("disconnectable websocket proxy is already paused"))?; timeout(CONNECT_TIMEOUT, blocked_connection_rx) .await .map_err(|_| anyhow!("timed out waiting for client reconnect attempt"))? .map_err(|_| anyhow!("disconnectable websocket proxy stopped"))?; Ok(()) } pub(crate) fn resume(&mut self) -> anyhow::Result<()> { self.resume_tx .take() .ok_or_else(|| anyhow!("disconnectable websocket proxy is already resumed"))? .send(()) .map_err(|_| anyhow!("disconnectable websocket proxy stopped"))?; Ok(()) } } async fn run_disconnectable_proxy( listener: TcpListener, upstream: String, pause_rx: oneshot::Receiver<()>, blocked_connection_tx: oneshot::Sender<()>, mut resume_rx: oneshot::Receiver<()>, ) { let Ok((mut downstream, _)) = listener.accept().await else { return; }; let Ok(mut upstream_stream) = TcpStream::connect(&upstream).await else { return; }; tokio::select! { _ = copy_bidirectional(&mut downstream, &mut upstream_stream) => return, _ = pause_rx => {} } drop(downstream); drop(upstream_stream); let mut blocked_connection_tx = Some(blocked_connection_tx); loop { tokio::select! { _ = &mut resume_rx => break, accepted = listener.accept() => { let Ok((blocked, _)) = accepted else { break; }; drop(blocked); if let Some(blocked_connection_tx) = blocked_connection_tx.take() { let _ = blocked_connection_tx.send(()); } } } } loop { let Ok((mut downstream, _)) = listener.accept().await else { return; }; let Ok(mut upstream_stream) = TcpStream::connect(&upstream).await else { continue; }; let _ = copy_bidirectional(&mut downstream, &mut upstream_stream).await; } } async fn connect_websocket_when_ready( websocket_url: &str, ) -> anyhow::Result<( tokio_tungstenite::WebSocketStream>, tokio_tungstenite::tungstenite::handshake::client::Response, )> { let deadline = Instant::now() + CONNECT_TIMEOUT; loop { match connect_async(websocket_url).await { Ok(websocket) => return Ok(websocket), Err(err) if Instant::now() < deadline && matches!( err, tokio_tungstenite::tungstenite::Error::Io(ref io_err) if io_err.kind() == std::io::ErrorKind::ConnectionRefused ) => { sleep(CONNECT_RETRY_INTERVAL).await; } Err(err) => return Err(err.into()), } } } async fn read_listen_url_from_stdout(child: &mut Child) -> anyhow::Result { let stdout = child .stdout .take() .ok_or_else(|| anyhow!("failed to capture exec-server stdout"))?; let mut lines = BufReader::new(stdout).lines(); let deadline = Instant::now() + CONNECT_TIMEOUT; loop { let now = Instant::now(); if now >= deadline { return Err(anyhow!( "timed out waiting for exec-server listen URL on stdout after {CONNECT_TIMEOUT:?}" )); } let remaining = deadline.duration_since(now); let line = timeout(remaining, lines.next_line()) .await .map_err(|_| anyhow!("timed out waiting for exec-server stdout"))?? .ok_or_else(|| anyhow!("exec-server stdout closed before emitting listen URL"))?; let listen_url = line.trim(); if listen_url.starts_with("ws://") { return Ok(listen_url.to_string()); } } }