mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
829f5b6b59
## Why The app-server and exec-server expose separate JSON-RPC APIs, but exec-server currently sources its serialized protocol and envelope types through app-server-oriented code. Giving each API an explicit owner makes the crate boundary legible without introducing shared generic envelopes. ## What changed - Added `codex-exec-server-protocol` to own exec DTOs, process IDs, and JSON-RPC envelopes. - Updated exec-server clients, transports, handlers, and tests to use the new crate. - Exposed app-server's existing JSON-RPC types through a public `rpc` module while retaining root re-exports. - Preserved existing wire shapes, including exec `PathUri` behavior. ## Stack This is PR 1 of 6. Next: [PR #29721](https://github.com/openai/codex/pull/29721), which moves auth mode below the app wire boundary. ## Validation - Exec-server protocol and server coverage passed in the focused protocol test runs. - App-server protocol schema fixtures passed.
393 lines
12 KiB
Rust
393 lines
12 KiB
Rust
#![allow(dead_code)]
|
|
|
|
use std::path::PathBuf;
|
|
use std::process::Stdio;
|
|
use std::time::Duration;
|
|
|
|
use anyhow::anyhow;
|
|
use codex_exec_server_protocol::JSONRPCMessage;
|
|
use codex_exec_server_protocol::JSONRPCNotification;
|
|
use codex_exec_server_protocol::JSONRPCRequest;
|
|
use codex_exec_server_protocol::RequestId;
|
|
use futures::SinkExt;
|
|
use futures::StreamExt;
|
|
use tempfile::TempDir;
|
|
use tokio::io::AsyncBufReadExt;
|
|
use tokio::io::BufReader;
|
|
use tokio::io::copy_bidirectional;
|
|
use tokio::net::TcpListener;
|
|
use tokio::net::TcpStream;
|
|
use tokio::process::Child;
|
|
use tokio::process::Command;
|
|
use tokio::sync::oneshot;
|
|
use tokio::task::JoinHandle;
|
|
use tokio::time::Instant;
|
|
use tokio::time::sleep;
|
|
use tokio::time::timeout;
|
|
use tokio_tungstenite::connect_async;
|
|
use tokio_tungstenite::tungstenite::Message;
|
|
|
|
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
|
const CONNECT_RETRY_INTERVAL: Duration = Duration::from_millis(25);
|
|
const EVENT_TIMEOUT: Duration = Duration::from_secs(5);
|
|
|
|
pub(crate) struct ExecServerHarness {
|
|
_codex_home: TempDir,
|
|
_helper_paths: TestCodexHelperPaths,
|
|
child: Child,
|
|
websocket_url: String,
|
|
websocket: tokio_tungstenite::WebSocketStream<
|
|
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
|
|
>,
|
|
next_request_id: i64,
|
|
}
|
|
|
|
impl Drop for ExecServerHarness {
|
|
fn drop(&mut self) {
|
|
let _ = self.child.start_kill();
|
|
}
|
|
}
|
|
|
|
pub(crate) struct TestCodexHelperPaths {
|
|
pub(crate) codex_exe: PathBuf,
|
|
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
|
|
}
|
|
|
|
pub(crate) struct DisconnectableWebSocketProxy {
|
|
websocket_url: String,
|
|
pause_tx: Option<oneshot::Sender<()>>,
|
|
blocked_connection_rx: Option<oneshot::Receiver<()>>,
|
|
resume_tx: Option<oneshot::Sender<()>>,
|
|
task: JoinHandle<()>,
|
|
}
|
|
|
|
impl Drop for DisconnectableWebSocketProxy {
|
|
fn drop(&mut self) {
|
|
self.task.abort();
|
|
}
|
|
}
|
|
|
|
pub(crate) fn test_codex_helper_paths() -> anyhow::Result<TestCodexHelperPaths> {
|
|
let (helper_binary, codex_linux_sandbox_exe) = super::current_test_binary_helper_paths()?;
|
|
Ok(TestCodexHelperPaths {
|
|
codex_exe: helper_binary,
|
|
codex_linux_sandbox_exe,
|
|
})
|
|
}
|
|
|
|
pub(crate) async fn exec_server() -> anyhow::Result<ExecServerHarness> {
|
|
exec_server_with_env(std::iter::empty::<(&str, &str)>()).await
|
|
}
|
|
|
|
pub(crate) async fn exec_server_with_env<I, K, V>(env: I) -> anyhow::Result<ExecServerHarness>
|
|
where
|
|
I: IntoIterator<Item = (K, V)>,
|
|
K: AsRef<std::ffi::OsStr>,
|
|
V: AsRef<std::ffi::OsStr>,
|
|
{
|
|
let helper_paths = test_codex_helper_paths()?;
|
|
let codex_home = TempDir::new()?;
|
|
let mut child = Command::new(&helper_paths.codex_exe);
|
|
child.args(["exec-server", "--listen", "ws://127.0.0.1:0"]);
|
|
child.stdin(Stdio::null());
|
|
child.stdout(Stdio::piped());
|
|
child.stderr(Stdio::inherit());
|
|
child.kill_on_drop(true);
|
|
child.env("CODEX_HOME", codex_home.path());
|
|
child.envs(env);
|
|
let mut child = child.spawn()?;
|
|
|
|
let websocket_url = read_listen_url_from_stdout(&mut child).await?;
|
|
let (websocket, _) = connect_websocket_when_ready(&websocket_url).await?;
|
|
Ok(ExecServerHarness {
|
|
_codex_home: codex_home,
|
|
_helper_paths: helper_paths,
|
|
child,
|
|
websocket_url,
|
|
websocket,
|
|
next_request_id: 1,
|
|
})
|
|
}
|
|
|
|
impl ExecServerHarness {
|
|
pub(crate) fn websocket_url(&self) -> &str {
|
|
&self.websocket_url
|
|
}
|
|
|
|
pub(crate) async fn disconnect_websocket(&mut self) -> anyhow::Result<()> {
|
|
self.websocket.close(None).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn reconnect_websocket(&mut self) -> anyhow::Result<()> {
|
|
let (websocket, _) = connect_websocket_when_ready(&self.websocket_url).await?;
|
|
self.websocket = websocket;
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn disconnectable_websocket_proxy(
|
|
&self,
|
|
) -> anyhow::Result<DisconnectableWebSocketProxy> {
|
|
let upstream = self
|
|
.websocket_url
|
|
.strip_prefix("ws://")
|
|
.ok_or_else(|| anyhow!("exec-server websocket URL must use ws://"))?
|
|
.to_string();
|
|
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
|
let websocket_url = format!("ws://{}", listener.local_addr()?);
|
|
let (pause_tx, pause_rx) = oneshot::channel();
|
|
let (blocked_connection_tx, blocked_connection_rx) = oneshot::channel();
|
|
let (resume_tx, resume_rx) = oneshot::channel();
|
|
let task = tokio::spawn(run_disconnectable_proxy(
|
|
listener,
|
|
upstream,
|
|
pause_rx,
|
|
blocked_connection_tx,
|
|
resume_rx,
|
|
));
|
|
Ok(DisconnectableWebSocketProxy {
|
|
websocket_url,
|
|
pause_tx: Some(pause_tx),
|
|
blocked_connection_rx: Some(blocked_connection_rx),
|
|
resume_tx: Some(resume_tx),
|
|
task,
|
|
})
|
|
}
|
|
|
|
pub(crate) async fn send_request(
|
|
&mut self,
|
|
method: &str,
|
|
params: serde_json::Value,
|
|
) -> anyhow::Result<RequestId> {
|
|
let id = RequestId::Integer(self.next_request_id);
|
|
self.next_request_id += 1;
|
|
self.send_message(JSONRPCMessage::Request(JSONRPCRequest {
|
|
id: id.clone(),
|
|
method: method.to_string(),
|
|
params: Some(params),
|
|
}))
|
|
.await?;
|
|
Ok(id)
|
|
}
|
|
|
|
pub(crate) async fn send_notification(
|
|
&mut self,
|
|
method: &str,
|
|
params: serde_json::Value,
|
|
) -> anyhow::Result<()> {
|
|
self.send_message(JSONRPCMessage::Notification(JSONRPCNotification {
|
|
method: method.to_string(),
|
|
params: Some(params),
|
|
}))
|
|
.await
|
|
}
|
|
|
|
pub(crate) async fn send_raw_text(&mut self, text: &str) -> anyhow::Result<()> {
|
|
self.websocket
|
|
.send(Message::Text(text.to_string().into()))
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn send_raw_binary(&mut self, bytes: Vec<u8>) -> anyhow::Result<()> {
|
|
self.websocket.send(Message::Binary(bytes.into())).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn next_event(&mut self) -> anyhow::Result<JSONRPCMessage> {
|
|
self.next_event_with_timeout(EVENT_TIMEOUT).await
|
|
}
|
|
|
|
pub(crate) async fn wait_for_event<F>(
|
|
&mut self,
|
|
mut predicate: F,
|
|
) -> anyhow::Result<JSONRPCMessage>
|
|
where
|
|
F: FnMut(&JSONRPCMessage) -> bool,
|
|
{
|
|
let deadline = Instant::now() + EVENT_TIMEOUT;
|
|
loop {
|
|
let now = Instant::now();
|
|
if now >= deadline {
|
|
return Err(anyhow!(
|
|
"timed out waiting for matching exec-server event after {EVENT_TIMEOUT:?}"
|
|
));
|
|
}
|
|
let remaining = deadline.duration_since(now);
|
|
let event = self.next_event_with_timeout(remaining).await?;
|
|
if predicate(&event) {
|
|
return Ok(event);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn shutdown(&mut self) -> anyhow::Result<()> {
|
|
self.child.start_kill()?;
|
|
timeout(CONNECT_TIMEOUT, self.child.wait())
|
|
.await
|
|
.map_err(|_| anyhow!("timed out waiting for exec-server shutdown"))??;
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_message(&mut self, message: JSONRPCMessage) -> anyhow::Result<()> {
|
|
let encoded = serde_json::to_string(&message)?;
|
|
self.websocket.send(Message::Text(encoded.into())).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn next_event_with_timeout(
|
|
&mut self,
|
|
timeout_duration: Duration,
|
|
) -> anyhow::Result<JSONRPCMessage> {
|
|
loop {
|
|
let frame = timeout(timeout_duration, self.websocket.next())
|
|
.await
|
|
.map_err(|_| anyhow!("timed out waiting for exec-server websocket event"))?
|
|
.ok_or_else(|| anyhow!("exec-server websocket closed"))??;
|
|
|
|
match frame {
|
|
Message::Text(text) => {
|
|
return Ok(serde_json::from_str(text.as_ref())?);
|
|
}
|
|
Message::Binary(bytes) => {
|
|
return Ok(serde_json::from_slice(bytes.as_ref())?);
|
|
}
|
|
Message::Close(_) => return Err(anyhow!("exec-server websocket closed")),
|
|
Message::Ping(_) | Message::Pong(_) => {}
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl DisconnectableWebSocketProxy {
|
|
pub(crate) fn websocket_url(&self) -> &str {
|
|
&self.websocket_url
|
|
}
|
|
|
|
pub(crate) async fn pause_and_disconnect(&mut self) -> anyhow::Result<()> {
|
|
self.pause_tx
|
|
.take()
|
|
.ok_or_else(|| anyhow!("disconnectable websocket proxy is already paused"))?
|
|
.send(())
|
|
.map_err(|_| anyhow!("disconnectable websocket proxy stopped"))?;
|
|
let blocked_connection_rx = self
|
|
.blocked_connection_rx
|
|
.take()
|
|
.ok_or_else(|| anyhow!("disconnectable websocket proxy is already paused"))?;
|
|
timeout(CONNECT_TIMEOUT, blocked_connection_rx)
|
|
.await
|
|
.map_err(|_| anyhow!("timed out waiting for client reconnect attempt"))?
|
|
.map_err(|_| anyhow!("disconnectable websocket proxy stopped"))?;
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) fn resume(&mut self) -> anyhow::Result<()> {
|
|
self.resume_tx
|
|
.take()
|
|
.ok_or_else(|| anyhow!("disconnectable websocket proxy is already resumed"))?
|
|
.send(())
|
|
.map_err(|_| anyhow!("disconnectable websocket proxy stopped"))?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
async fn run_disconnectable_proxy(
|
|
listener: TcpListener,
|
|
upstream: String,
|
|
pause_rx: oneshot::Receiver<()>,
|
|
blocked_connection_tx: oneshot::Sender<()>,
|
|
mut resume_rx: oneshot::Receiver<()>,
|
|
) {
|
|
let Ok((mut downstream, _)) = listener.accept().await else {
|
|
return;
|
|
};
|
|
let Ok(mut upstream_stream) = TcpStream::connect(&upstream).await else {
|
|
return;
|
|
};
|
|
tokio::select! {
|
|
_ = copy_bidirectional(&mut downstream, &mut upstream_stream) => return,
|
|
_ = pause_rx => {}
|
|
}
|
|
drop(downstream);
|
|
drop(upstream_stream);
|
|
|
|
let mut blocked_connection_tx = Some(blocked_connection_tx);
|
|
loop {
|
|
tokio::select! {
|
|
_ = &mut resume_rx => break,
|
|
accepted = listener.accept() => {
|
|
let Ok((blocked, _)) = accepted else {
|
|
break;
|
|
};
|
|
drop(blocked);
|
|
if let Some(blocked_connection_tx) = blocked_connection_tx.take() {
|
|
let _ = blocked_connection_tx.send(());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
loop {
|
|
let Ok((mut downstream, _)) = listener.accept().await else {
|
|
return;
|
|
};
|
|
let Ok(mut upstream_stream) = TcpStream::connect(&upstream).await else {
|
|
continue;
|
|
};
|
|
let _ = copy_bidirectional(&mut downstream, &mut upstream_stream).await;
|
|
}
|
|
}
|
|
|
|
async fn connect_websocket_when_ready(
|
|
websocket_url: &str,
|
|
) -> anyhow::Result<(
|
|
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
|
|
tokio_tungstenite::tungstenite::handshake::client::Response,
|
|
)> {
|
|
let deadline = Instant::now() + CONNECT_TIMEOUT;
|
|
loop {
|
|
match connect_async(websocket_url).await {
|
|
Ok(websocket) => return Ok(websocket),
|
|
Err(err)
|
|
if Instant::now() < deadline
|
|
&& matches!(
|
|
err,
|
|
tokio_tungstenite::tungstenite::Error::Io(ref io_err)
|
|
if io_err.kind() == std::io::ErrorKind::ConnectionRefused
|
|
) =>
|
|
{
|
|
sleep(CONNECT_RETRY_INTERVAL).await;
|
|
}
|
|
Err(err) => return Err(err.into()),
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn read_listen_url_from_stdout(child: &mut Child) -> anyhow::Result<String> {
|
|
let stdout = child
|
|
.stdout
|
|
.take()
|
|
.ok_or_else(|| anyhow!("failed to capture exec-server stdout"))?;
|
|
let mut lines = BufReader::new(stdout).lines();
|
|
let deadline = Instant::now() + CONNECT_TIMEOUT;
|
|
|
|
loop {
|
|
let now = Instant::now();
|
|
if now >= deadline {
|
|
return Err(anyhow!(
|
|
"timed out waiting for exec-server listen URL on stdout after {CONNECT_TIMEOUT:?}"
|
|
));
|
|
}
|
|
let remaining = deadline.duration_since(now);
|
|
let line = timeout(remaining, lines.next_line())
|
|
.await
|
|
.map_err(|_| anyhow!("timed out waiting for exec-server stdout"))??
|
|
.ok_or_else(|| anyhow!("exec-server stdout closed before emitting listen URL"))?;
|
|
let listen_url = line.trim();
|
|
if listen_url.starts_with("ws://") {
|
|
return Ok(listen_url.to_string());
|
|
}
|
|
}
|
|
}
|