Files
codex/codex-rs/exec-server/tests/common/exec_server.rs
T
jif cf17e1bc20 Resume exec-server sessions after disconnect (#28512)
Supersedes #28288 (closed).

## Why

A short WebSocket interruption currently ends every client-side process
handle, even though exec-server keeps the server session and its
processes alive for a short time.

This is especially visible for executor-backed stdio MCP servers: a
temporary connection loss becomes a permanent `Transport closed` error.
The server already has the information needed to resume the session, but
the client opens a fresh session instead of using it.

This change reconnects below the process and MCP layers. Existing
process handles stay valid, missed output is recovered, and the same
server-side processes continue running.

## State machine

One logical `ExecServerClient` stays alive while its underlying RPC
connection changes generations.

```text
                         transport closes
       +------------------------------------------------+
       |                                                v
+-------------+                                  +-------------+
|  Connected  |                                  | Recovering  |
+-------------+                                  +-------------+
       ^                                                |
       | session resumed, processes caught up           | retryable error
       +------------------------------------------------+ loops until deadline
                                                        |
                                                        | deadline or permanent error
                                                        v
                                                  +-------------+
                                                  |   Failed    |
                                                  +-------------+
```

### `Connected`

- New RPC calls use the current connection.
- Process notifications are published in sequence order.
- A disconnect only starts recovery if it came from the current
connection generation. Late events from older generations cannot replace
the active connection.

### `Recovering`

- New calls wait instead of choosing a half-connected RPC client.
- Existing process handles, wake subscriptions, and event subscriptions
stay open.
- Streaming HTTP response bodies fail immediately because their byte
streams cannot be resumed safely.
- Recovery first waits for process starts that were already in flight. A
start whose result became ambiguous is cleaned up after reconnection
instead of being silently adopted.
- The client reconnects with the learned `session_id`. The server may
briefly report that the old connection is still attached, so that error
is retried until the detach finishes.
- The notification consumer starts before the resume handshake
completes. This prevents a busy process from filling the notification
queue and blocking the initialize response.
- Before installing the new connection, the client catches up every
recoverable process with `process/read`.

### `Failed`

- Recovery stops after 25 seconds or after a permanent error.
- Waiting calls are released with one stable disconnect error.
- Existing process sessions receive a terminal failure instead of
waiting forever.

## Recovering process events

Output, exit, and close events share one sequence. During normal
operation, the client buffers early events until every lower sequence
has been published.

After reconnection, the client reads each process starting after its
last published sequence:

1. Retained output chunks are inserted by sequence number.
2. Exit and close state are reconstructed in their sequence positions.
3. Events already received as live notifications are ignored as
duplicates.
4. Newly contiguous events are published in order.
5. If the server no longer retains enough output to fill a sequence gap,
only that process is terminated and failed. The recovered connection
remains usable for other processes.

The server reports its full next event sequence for unbounded reads,
including exit and close events. Closed processes remain readable for
the same 30-second window used to retain detached sessions.

## Other details

- Detached server sessions are retained for 30 seconds, leaving margin
around the client's 25-second recovery deadline.
- Session attach and detach update the active notification sender under
the same attachment lock, so an old connection cannot clear a newly
attached sender.
- A dedicated error code distinguishes the temporary "session is still
attached" race from permanent initialization errors.
- Process starts are identity-checked on both client and server. Cleanup
from an older start cannot remove a newer process that reused the same
ID.
- Mutating requests that were already in flight when the transport
closed are not replayed, because the client cannot know whether the
server applied them. Requests started after recovery is known wait for
the replacement connection.
- We assume the server/client version stays in sync (on the before/after
this PR)

## User impact

Long-running commands and stdio MCP servers can survive a temporary
exec-server WebSocket interruption without changing process IDs or
losing output produced during the outage.
2026-06-17 10:20:39 +02:00

394 lines
13 KiB
Rust

#![allow(dead_code)]
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use anyhow::anyhow;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::RequestId;
use futures::SinkExt;
use futures::StreamExt;
use tempfile::TempDir;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::copy_bidirectional;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::process::Child;
use tokio::process::Command;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio::time::sleep;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const CONNECT_RETRY_INTERVAL: Duration = Duration::from_millis(25);
const EVENT_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) struct ExecServerHarness {
_codex_home: TempDir,
_helper_paths: TestCodexHelperPaths,
child: Child,
websocket_url: String,
websocket: tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
next_request_id: i64,
}
impl Drop for ExecServerHarness {
fn drop(&mut self) {
let _ = self.child.start_kill();
}
}
pub(crate) struct TestCodexHelperPaths {
pub(crate) codex_exe: PathBuf,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
}
pub(crate) struct DisconnectableWebSocketProxy {
websocket_url: String,
pause_tx: Option<oneshot::Sender<()>>,
blocked_connection_rx: Option<oneshot::Receiver<()>>,
resume_tx: Option<oneshot::Sender<()>>,
task: JoinHandle<()>,
}
impl Drop for DisconnectableWebSocketProxy {
fn drop(&mut self) {
self.task.abort();
}
}
pub(crate) fn test_codex_helper_paths() -> anyhow::Result<TestCodexHelperPaths> {
let (helper_binary, codex_linux_sandbox_exe) = super::current_test_binary_helper_paths()?;
Ok(TestCodexHelperPaths {
codex_exe: helper_binary,
codex_linux_sandbox_exe,
})
}
pub(crate) async fn exec_server() -> anyhow::Result<ExecServerHarness> {
exec_server_with_env(std::iter::empty::<(&str, &str)>()).await
}
pub(crate) async fn exec_server_with_env<I, K, V>(env: I) -> anyhow::Result<ExecServerHarness>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<std::ffi::OsStr>,
V: AsRef<std::ffi::OsStr>,
{
let helper_paths = test_codex_helper_paths()?;
let codex_home = TempDir::new()?;
let mut child = Command::new(&helper_paths.codex_exe);
child.args(["exec-server", "--listen", "ws://127.0.0.1:0"]);
child.stdin(Stdio::null());
child.stdout(Stdio::piped());
child.stderr(Stdio::inherit());
child.kill_on_drop(true);
child.env("CODEX_HOME", codex_home.path());
child.envs(env);
let mut child = child.spawn()?;
let websocket_url = read_listen_url_from_stdout(&mut child).await?;
let (websocket, _) = connect_websocket_when_ready(&websocket_url).await?;
Ok(ExecServerHarness {
_codex_home: codex_home,
_helper_paths: helper_paths,
child,
websocket_url,
websocket,
next_request_id: 1,
})
}
impl ExecServerHarness {
pub(crate) fn websocket_url(&self) -> &str {
&self.websocket_url
}
pub(crate) async fn disconnect_websocket(&mut self) -> anyhow::Result<()> {
self.websocket.close(None).await?;
Ok(())
}
pub(crate) async fn reconnect_websocket(&mut self) -> anyhow::Result<()> {
let (websocket, _) = connect_websocket_when_ready(&self.websocket_url).await?;
self.websocket = websocket;
Ok(())
}
pub(crate) async fn disconnectable_websocket_proxy(
&self,
) -> anyhow::Result<DisconnectableWebSocketProxy> {
let upstream = self
.websocket_url
.strip_prefix("ws://")
.ok_or_else(|| anyhow!("exec-server websocket URL must use ws://"))?
.to_string();
let listener = TcpListener::bind("127.0.0.1:0").await?;
let websocket_url = format!("ws://{}", listener.local_addr()?);
let (pause_tx, pause_rx) = oneshot::channel();
let (blocked_connection_tx, blocked_connection_rx) = oneshot::channel();
let (resume_tx, resume_rx) = oneshot::channel();
let task = tokio::spawn(run_disconnectable_proxy(
listener,
upstream,
pause_rx,
blocked_connection_tx,
resume_rx,
));
Ok(DisconnectableWebSocketProxy {
websocket_url,
pause_tx: Some(pause_tx),
blocked_connection_rx: Some(blocked_connection_rx),
resume_tx: Some(resume_tx),
task,
})
}
pub(crate) async fn send_request(
&mut self,
method: &str,
params: serde_json::Value,
) -> anyhow::Result<RequestId> {
let id = RequestId::Integer(self.next_request_id);
self.next_request_id += 1;
self.send_message(JSONRPCMessage::Request(JSONRPCRequest {
id: id.clone(),
method: method.to_string(),
params: Some(params),
trace: None,
}))
.await?;
Ok(id)
}
pub(crate) async fn send_notification(
&mut self,
method: &str,
params: serde_json::Value,
) -> anyhow::Result<()> {
self.send_message(JSONRPCMessage::Notification(JSONRPCNotification {
method: method.to_string(),
params: Some(params),
}))
.await
}
pub(crate) async fn send_raw_text(&mut self, text: &str) -> anyhow::Result<()> {
self.websocket
.send(Message::Text(text.to_string().into()))
.await?;
Ok(())
}
pub(crate) async fn send_raw_binary(&mut self, bytes: Vec<u8>) -> anyhow::Result<()> {
self.websocket.send(Message::Binary(bytes.into())).await?;
Ok(())
}
pub(crate) async fn next_event(&mut self) -> anyhow::Result<JSONRPCMessage> {
self.next_event_with_timeout(EVENT_TIMEOUT).await
}
pub(crate) async fn wait_for_event<F>(
&mut self,
mut predicate: F,
) -> anyhow::Result<JSONRPCMessage>
where
F: FnMut(&JSONRPCMessage) -> bool,
{
let deadline = Instant::now() + EVENT_TIMEOUT;
loop {
let now = Instant::now();
if now >= deadline {
return Err(anyhow!(
"timed out waiting for matching exec-server event after {EVENT_TIMEOUT:?}"
));
}
let remaining = deadline.duration_since(now);
let event = self.next_event_with_timeout(remaining).await?;
if predicate(&event) {
return Ok(event);
}
}
}
pub(crate) async fn shutdown(&mut self) -> anyhow::Result<()> {
self.child.start_kill()?;
timeout(CONNECT_TIMEOUT, self.child.wait())
.await
.map_err(|_| anyhow!("timed out waiting for exec-server shutdown"))??;
Ok(())
}
async fn send_message(&mut self, message: JSONRPCMessage) -> anyhow::Result<()> {
let encoded = serde_json::to_string(&message)?;
self.websocket.send(Message::Text(encoded.into())).await?;
Ok(())
}
async fn next_event_with_timeout(
&mut self,
timeout_duration: Duration,
) -> anyhow::Result<JSONRPCMessage> {
loop {
let frame = timeout(timeout_duration, self.websocket.next())
.await
.map_err(|_| anyhow!("timed out waiting for exec-server websocket event"))?
.ok_or_else(|| anyhow!("exec-server websocket closed"))??;
match frame {
Message::Text(text) => {
return Ok(serde_json::from_str(text.as_ref())?);
}
Message::Binary(bytes) => {
return Ok(serde_json::from_slice(bytes.as_ref())?);
}
Message::Close(_) => return Err(anyhow!("exec-server websocket closed")),
Message::Ping(_) | Message::Pong(_) => {}
_ => {}
}
}
}
}
impl DisconnectableWebSocketProxy {
pub(crate) fn websocket_url(&self) -> &str {
&self.websocket_url
}
pub(crate) async fn pause_and_disconnect(&mut self) -> anyhow::Result<()> {
self.pause_tx
.take()
.ok_or_else(|| anyhow!("disconnectable websocket proxy is already paused"))?
.send(())
.map_err(|_| anyhow!("disconnectable websocket proxy stopped"))?;
let blocked_connection_rx = self
.blocked_connection_rx
.take()
.ok_or_else(|| anyhow!("disconnectable websocket proxy is already paused"))?;
timeout(CONNECT_TIMEOUT, blocked_connection_rx)
.await
.map_err(|_| anyhow!("timed out waiting for client reconnect attempt"))?
.map_err(|_| anyhow!("disconnectable websocket proxy stopped"))?;
Ok(())
}
pub(crate) fn resume(&mut self) -> anyhow::Result<()> {
self.resume_tx
.take()
.ok_or_else(|| anyhow!("disconnectable websocket proxy is already resumed"))?
.send(())
.map_err(|_| anyhow!("disconnectable websocket proxy stopped"))?;
Ok(())
}
}
async fn run_disconnectable_proxy(
listener: TcpListener,
upstream: String,
pause_rx: oneshot::Receiver<()>,
blocked_connection_tx: oneshot::Sender<()>,
mut resume_rx: oneshot::Receiver<()>,
) {
let Ok((mut downstream, _)) = listener.accept().await else {
return;
};
let Ok(mut upstream_stream) = TcpStream::connect(&upstream).await else {
return;
};
tokio::select! {
_ = copy_bidirectional(&mut downstream, &mut upstream_stream) => return,
_ = pause_rx => {}
}
drop(downstream);
drop(upstream_stream);
let mut blocked_connection_tx = Some(blocked_connection_tx);
loop {
tokio::select! {
_ = &mut resume_rx => break,
accepted = listener.accept() => {
let Ok((blocked, _)) = accepted else {
break;
};
drop(blocked);
if let Some(blocked_connection_tx) = blocked_connection_tx.take() {
let _ = blocked_connection_tx.send(());
}
}
}
}
loop {
let Ok((mut downstream, _)) = listener.accept().await else {
return;
};
let Ok(mut upstream_stream) = TcpStream::connect(&upstream).await else {
continue;
};
let _ = copy_bidirectional(&mut downstream, &mut upstream_stream).await;
}
}
async fn connect_websocket_when_ready(
websocket_url: &str,
) -> anyhow::Result<(
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
tokio_tungstenite::tungstenite::handshake::client::Response,
)> {
let deadline = Instant::now() + CONNECT_TIMEOUT;
loop {
match connect_async(websocket_url).await {
Ok(websocket) => return Ok(websocket),
Err(err)
if Instant::now() < deadline
&& matches!(
err,
tokio_tungstenite::tungstenite::Error::Io(ref io_err)
if io_err.kind() == std::io::ErrorKind::ConnectionRefused
) =>
{
sleep(CONNECT_RETRY_INTERVAL).await;
}
Err(err) => return Err(err.into()),
}
}
}
async fn read_listen_url_from_stdout(child: &mut Child) -> anyhow::Result<String> {
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("failed to capture exec-server stdout"))?;
let mut lines = BufReader::new(stdout).lines();
let deadline = Instant::now() + CONNECT_TIMEOUT;
loop {
let now = Instant::now();
if now >= deadline {
return Err(anyhow!(
"timed out waiting for exec-server listen URL on stdout after {CONNECT_TIMEOUT:?}"
));
}
let remaining = deadline.duration_since(now);
let line = timeout(remaining, lines.next_line())
.await
.map_err(|_| anyhow!("timed out waiting for exec-server stdout"))??
.ok_or_else(|| anyhow!("exec-server stdout closed before emitting listen URL"))?;
let listen_url = line.trim();
if listen_url.starts_with("ws://") {
return Ok(listen_url.to_string());
}
}
}