From e2398d0b1667f0ae71ecb4f86ceb9fcc88b315e4 Mon Sep 17 00:00:00 2001 From: Max Johnson <162359438+maxj-oai@users.noreply.github.com> Date: Sat, 27 Jun 2026 12:34:10 -0700 Subject: [PATCH] [app-server] expose environment info RPC (#30291) ## Why App-server clients that configure named execution environments need to discover an environment's shell and working directory before selecting it for a thread or turn. Because the environment can run on a different operating system than app-server, its working directory is represented as a canonical `file:` URI rather than a host-local path string. The probe also needs a bounded response time: an exec-server that completes initialization but never answers `environment/info` must not hold the environment serialization queue indefinitely. ## What changed - Add an experimental `environment/info` app-server RPC for named environments. - Route the probe through the managed environment connection and return target-native shell metadata plus the default working directory as a `PathUri`. - Return connection and protocol failures as JSON-RPC errors. - Bound the exec-server probe response to 30 seconds and remove timed-out calls from the pending-request table so later environment mutations can proceed. - Cover successful responses, omitted working directories, unknown environments, connection failures, and pending-call cleanup. ## Protocol examples Request: ```json { "id": 42, "method": "environment/info", "params": { "environmentId": "remote-a" } } ``` Successful response: ```json { "id": 42, "result": { "shell": { "name": "zsh", "path": "/bin/zsh" }, "cwd": "file:///workspace" } } ``` If the exec-server initializes but does not answer the probe within 30 seconds: ```json { "id": 42, "error": { "code": -32603, "message": "failed to get info for environment `remote-a`: exec-server protocol error: timed out waiting for exec-server `environment/info` response after 30s" } } ``` ## Testing - App-server integration coverage for successful info (including omitted `cwd`), unknown environments, and connection failures. - Exec-server RPC coverage verifying a timed-out call is removed from the pending-request table. --------- Co-authored-by: Michael Bolin --- codex-rs/app-server-protocol/src/export.rs | 2 + .../src/protocol/common.rs | 7 + .../src/protocol/v2/environment.rs | 27 +++ codex-rs/app-server/README.md | 1 + codex-rs/app-server/src/message_processor.rs | 3 + codex-rs/app-server/src/request_processors.rs | 3 + .../environment_processor.rs | 26 +++ .../tests/suite/v2/environment_info.rs | 208 ++++++++++++++++++ .../suite/v2/exec_server_test_support.rs | 73 ++++++ .../app-server/tests/suite/v2/mcp_tool.rs | 58 +---- codex-rs/app-server/tests/suite/v2/mod.rs | 2 + codex-rs/exec-server/src/client.rs | 36 +-- codex-rs/exec-server/src/rpc.rs | 86 +++++++- 13 files changed, 466 insertions(+), 66 deletions(-) create mode 100644 codex-rs/app-server/tests/suite/v2/environment_info.rs create mode 100644 codex-rs/app-server/tests/suite/v2/exec_server_test_support.rs diff --git a/codex-rs/app-server-protocol/src/export.rs b/codex-rs/app-server-protocol/src/export.rs index 91eca690f..9f27dff7f 100644 --- a/codex-rs/app-server-protocol/src/export.rs +++ b/codex-rs/app-server-protocol/src/export.rs @@ -43,6 +43,8 @@ pub(crate) const GENERATED_TS_HEADER: &str = "// GENERATED CODE! DO NOT MODIFY B const IGNORED_DEFINITIONS: &[&str] = &["Option<()>"]; const JSON_V1_ALLOWLIST: &[&str] = &["InitializeParams", "InitializeResponse"]; const EXPERIMENTAL_CLIENT_METHOD_DEPENDENCY_TYPES: &[&str] = &[ + "EnvironmentShellInfo", + "PathUri", "RemoteControlClient", "RemoteControlClientsListOrder", "ThreadBackgroundTerminal", diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 7b1503557..d8db99d4f 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -943,6 +943,13 @@ client_request_definitions! { serialization: global("environment"), response: v2::EnvironmentAddResponse, }, + #[experimental("environment/info")] + /// Reads information from a configured execution environment. + EnvironmentInfo => "environment/info" { + params: v2::EnvironmentInfoParams, + serialization: global_shared_read("environment"), + response: v2::EnvironmentInfoResponse, + }, McpServerOauthLogin => "mcpServer/oauth/login" { params: v2::McpServerOauthLoginParams, diff --git a/codex-rs/app-server-protocol/src/protocol/v2/environment.rs b/codex-rs/app-server-protocol/src/protocol/v2/environment.rs index ccffd5813..320e2af89 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/environment.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/environment.rs @@ -1,3 +1,4 @@ +use codex_utils_path_uri::PathUri; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -19,3 +20,29 @@ pub struct EnvironmentAddParams { #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] pub struct EnvironmentAddResponse {} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct EnvironmentInfoParams { + pub environment_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct EnvironmentInfoResponse { + pub shell: EnvironmentShellInfo, + /// Default working directory reported by the environment, as a canonical file URI. + pub cwd: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct EnvironmentShellInfo { + /// Stable shell name, for example `zsh`, `bash`, `powershell`, `sh`, or `cmd`. + pub name: String, + /// Target-native shell executable path or command name. + pub path: String, +} diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 6f34cae54..c9ac2f5fb 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -205,6 +205,7 @@ Example with notification opt-out: - `permissionProfile/list` — beta; list available permission profile ids with optional display `description` text and an `allowed` flag reflecting effective requirements, using cursor pagination. Pass `cwd` when the caller needs project-local `[permissions.]` entries to be included in the current catalog view. - `experimentalFeature/enablement/set` — patch the in-memory process-wide runtime feature enablement for currently supported feature keys. For each feature, precedence is: cloud requirements > --enable > config.toml > experimentalFeature/enablement/set (new) > code default. Invalid keys will be ignored. - `environment/add` — experimental; add or replace a named remote environment by `environmentId` and `execServerUrl` for later selection by `thread/start` or `turn/start`; optional `connectTimeoutMs` overrides the WebSocket connection timeout; returns `{}` and does not change the default environment. +- `environment/info` — experimental; connect to a configured environment by `environmentId` and return its detected `shell` plus its default `cwd` as a canonical environment-native `file:` URI. Connection failures are returned as request errors. - `collaborationMode/list` — list available collaboration mode presets (experimental, no pagination). Built-in presets do not select a model; the Plan preset selects medium reasoning effort. This response omits built-in developer instructions; clients should either pass `settings.developer_instructions: null` when setting a mode to use Codex's built-in instructions, or provide their own instructions explicitly. - `skills/list` — list skills for one or more `cwd` values (optional `forceReload`). - `skills/extraRoots/set` — replace the app-server process runtime extra standalone skill roots. The roots are not persisted; missing directories are accepted and simply load no skills. diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 3322f24d8..375307e80 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -1037,6 +1037,9 @@ impl MessageProcessor { ClientRequest::EnvironmentAdd { params, .. } => { self.environment_processor.environment_add(params).await } + ClientRequest::EnvironmentInfo { params, .. } => { + self.environment_processor.environment_info(params).await + } ClientRequest::FsReadFile { params, .. } => self .fs_processor .read_file(params) diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index cb0f276d8..587325c24 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -62,6 +62,9 @@ use codex_app_server_protocol::DynamicToolNamespaceTool; use codex_app_server_protocol::DynamicToolSpec; use codex_app_server_protocol::EnvironmentAddParams; use codex_app_server_protocol::EnvironmentAddResponse; +use codex_app_server_protocol::EnvironmentInfoParams; +use codex_app_server_protocol::EnvironmentInfoResponse; +use codex_app_server_protocol::EnvironmentShellInfo; use codex_app_server_protocol::ExperimentalFeature as ApiExperimentalFeature; use codex_app_server_protocol::ExperimentalFeatureListParams; use codex_app_server_protocol::ExperimentalFeatureListResponse; diff --git a/codex-rs/app-server/src/request_processors/environment_processor.rs b/codex-rs/app-server/src/request_processors/environment_processor.rs index 36533aa23..635e1732e 100644 --- a/codex-rs/app-server/src/request_processors/environment_processor.rs +++ b/codex-rs/app-server/src/request_processors/environment_processor.rs @@ -26,4 +26,30 @@ impl EnvironmentRequestProcessor { .map_err(|err| invalid_request(err.to_string()))?; Ok(Some(EnvironmentAddResponse {}.into())) } + + pub(crate) async fn environment_info( + &self, + params: EnvironmentInfoParams, + ) -> Result, JSONRPCErrorError> { + let environment_id = params.environment_id; + let environment = self + .environment_manager + .get_environment(&environment_id) + .ok_or_else(|| invalid_request(format!("unknown environment id `{environment_id}`")))?; + let info = environment.info().await.map_err(|err| { + internal_error(format!( + "failed to get info for environment `{environment_id}`: {err}" + )) + })?; + Ok(Some( + EnvironmentInfoResponse { + shell: EnvironmentShellInfo { + name: info.shell.name, + path: info.shell.path, + }, + cwd: info.cwd, + } + .into(), + )) + } } diff --git a/codex-rs/app-server/tests/suite/v2/environment_info.rs b/codex-rs/app-server/tests/suite/v2/environment_info.rs new file mode 100644 index 000000000..8b0e82518 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/environment_info.rs @@ -0,0 +1,208 @@ +use std::time::Duration; + +use anyhow::Result; +use app_test_support::TestAppServer; +use app_test_support::to_response; +use codex_app_server_protocol::EnvironmentAddResponse; +use codex_app_server_protocol::EnvironmentInfoResponse; +use codex_app_server_protocol::EnvironmentShellInfo; +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCErrorError; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_utils_path_uri::PathUri; +use pretty_assertions::assert_eq; +use serde_json::json; +use tempfile::TempDir; +use tokio::net::TcpListener; +use tokio::time::timeout; + +use super::exec_server_test_support::accept_exec_server_environment; + +const RPC_TIMEOUT: Duration = Duration::from_secs(10); +const INVALID_REQUEST_ERROR_CODE: i64 = -32600; +const INTERNAL_ERROR_CODE: i64 = -32603; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn environment_info_returns_remote_environment_info() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let exec_server_url = format!("ws://{}", listener.local_addr()?); + let exec_server = tokio::spawn(async move { + accept_exec_server_environment( + listener, + json!({ + "shell": {"name": "zsh", "path": "/bin/zsh"}, + "cwd": "file:///workspace", + }), + ) + .await?; + Ok::<_, anyhow::Error>(()) + }); + + let codex_home = TempDir::new()?; + let mut app_server = TestAppServer::new(codex_home.path()).await?; + timeout(RPC_TIMEOUT, app_server.initialize()).await??; + add_environment( + &mut app_server, + &exec_server_url, + /*connect_timeout_ms*/ None, + ) + .await?; + + let request_id = app_server + .send_raw_request( + "environment/info", + Some(json!({"environmentId": "remote-a"})), + ) + .await?; + let response: JSONRPCResponse = timeout( + RPC_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + assert_eq!( + to_response::(response)?, + EnvironmentInfoResponse { + shell: EnvironmentShellInfo { + name: "zsh".to_string(), + path: "/bin/zsh".to_string(), + }, + cwd: Some(PathUri::parse("file:///workspace")?), + } + ); + timeout(RPC_TIMEOUT, exec_server).await???; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn environment_info_accepts_missing_cwd() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let exec_server_url = format!("ws://{}", listener.local_addr()?); + let exec_server = tokio::spawn(async move { + accept_exec_server_environment( + listener, + json!({"shell": {"name": "zsh", "path": "/bin/zsh"}}), + ) + .await?; + Ok::<_, anyhow::Error>(()) + }); + + let codex_home = TempDir::new()?; + let mut app_server = TestAppServer::new(codex_home.path()).await?; + timeout(RPC_TIMEOUT, app_server.initialize()).await??; + add_environment( + &mut app_server, + &exec_server_url, + /*connect_timeout_ms*/ None, + ) + .await?; + + let request_id = app_server + .send_raw_request( + "environment/info", + Some(json!({"environmentId": "remote-a"})), + ) + .await?; + let response: JSONRPCResponse = timeout( + RPC_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + assert_eq!( + to_response::(response)?, + EnvironmentInfoResponse { + shell: EnvironmentShellInfo { + name: "zsh".to_string(), + path: "/bin/zsh".to_string(), + }, + cwd: None, + } + ); + timeout(RPC_TIMEOUT, exec_server).await???; + Ok(()) +} + +#[tokio::test] +async fn environment_info_rejects_unknown_environment() -> Result<()> { + let codex_home = TempDir::new()?; + let mut app_server = TestAppServer::new(codex_home.path()).await?; + timeout(RPC_TIMEOUT, app_server.initialize()).await??; + + let request_id = app_server + .send_raw_request( + "environment/info", + Some(json!({"environmentId": "missing"})), + ) + .await?; + let error = timeout( + RPC_TIMEOUT, + app_server.read_stream_until_error_message(RequestId::Integer(request_id)), + ) + .await??; + assert_eq!( + error, + JSONRPCError { + id: RequestId::Integer(request_id), + error: JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: "unknown environment id `missing`".to_string(), + data: None, + }, + } + ); + Ok(()) +} + +#[tokio::test] +async fn environment_info_reports_connection_failure() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let exec_server_url = format!("ws://{}", listener.local_addr()?); + let codex_home = TempDir::new()?; + let mut app_server = TestAppServer::new(codex_home.path()).await?; + timeout(RPC_TIMEOUT, app_server.initialize()).await??; + add_environment(&mut app_server, &exec_server_url, Some(50)).await?; + + let request_id = app_server + .send_raw_request( + "environment/info", + Some(json!({"environmentId": "remote-a"})), + ) + .await?; + let error = timeout( + RPC_TIMEOUT, + app_server.read_stream_until_error_message(RequestId::Integer(request_id)), + ) + .await??; + assert_eq!(error.error.code, INTERNAL_ERROR_CODE); + assert!( + error + .error + .message + .contains("failed to get info for environment `remote-a`") + ); + Ok(()) +} + +async fn add_environment( + app_server: &mut TestAppServer, + exec_server_url: &str, + connect_timeout_ms: Option, +) -> Result<()> { + let request_id = app_server + .send_raw_request( + "environment/add", + Some(json!({ + "environmentId": "remote-a", + "execServerUrl": exec_server_url, + "connectTimeoutMs": connect_timeout_ms, + })), + ) + .await?; + let response: JSONRPCResponse = timeout( + RPC_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let _: EnvironmentAddResponse = to_response(response)?; + Ok(()) +} diff --git a/codex-rs/app-server/tests/suite/v2/exec_server_test_support.rs b/codex-rs/app-server/tests/suite/v2/exec_server_test_support.rs new file mode 100644 index 000000000..f28e06e9f --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/exec_server_test_support.rs @@ -0,0 +1,73 @@ +use anyhow::Result; +use futures::SinkExt; +use futures::StreamExt; +use serde_json::Value; +use serde_json::json; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::accept_async; +use tokio_tungstenite::tungstenite::Message; + +pub(crate) async fn accept_exec_server_environment( + listener: TcpListener, + environment_info: Value, +) -> Result> { + let mut websocket = accept_initialized_exec_server(listener).await?; + + let request = read_exec_server_json(&mut websocket).await?; + assert_eq!(request["method"], "environment/info"); + websocket + .send(Message::Text( + json!({ + "id": request["id"], + "result": environment_info, + }) + .to_string() + .into(), + )) + .await?; + + Ok(websocket) +} + +pub(crate) async fn accept_initialized_exec_server( + listener: TcpListener, +) -> Result> { + let (stream, _) = listener.accept().await?; + let mut websocket = accept_async(stream).await?; + + let initialize = read_exec_server_json(&mut websocket).await?; + assert_eq!(initialize["method"], "initialize"); + websocket + .send(Message::Text( + json!({ + "id": initialize["id"], + "result": {"sessionId": "test-session"}, + }) + .to_string() + .into(), + )) + .await?; + let initialized = read_exec_server_json(&mut websocket).await?; + assert_eq!(initialized["method"], "initialized"); + + Ok(websocket) +} + +pub(crate) async fn read_exec_server_json( + websocket: &mut WebSocketStream, +) -> Result { + loop { + match websocket + .next() + .await + .ok_or_else(|| anyhow::anyhow!("exec-server websocket closed"))?? + { + Message::Text(text) => return Ok(serde_json::from_str(text.as_ref())?), + Message::Binary(bytes) => return Ok(serde_json::from_slice(bytes.as_ref())?), + Message::Ping(_) | Message::Pong(_) => {} + message => anyhow::bail!("expected JSON-RPC message, got {message:?}"), + } + } +} diff --git a/codex-rs/app-server/tests/suite/v2/mcp_tool.rs b/codex-rs/app-server/tests/suite/v2/mcp_tool.rs index f3a387245..1f3289e43 100644 --- a/codex-rs/app-server/tests/suite/v2/mcp_tool.rs +++ b/codex-rs/app-server/tests/suite/v2/mcp_tool.rs @@ -38,7 +38,6 @@ use codex_utils_path_uri::PathUri; use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP; use core_test_support::responses; use futures::SinkExt; -use futures::StreamExt; use pretty_assertions::assert_eq; use rmcp::handler::server::ServerHandler; use rmcp::model::BooleanSchema; @@ -64,14 +63,14 @@ use rmcp::transport::streamable_http_server::session::local::LocalSessionManager use serde_json::json; use tempfile::TempDir; use tokio::net::TcpListener; -use tokio::net::TcpStream; use tokio::sync::oneshot; use tokio::task::JoinHandle; use tokio::time::timeout; -use tokio_tungstenite::WebSocketStream; -use tokio_tungstenite::accept_async; use tokio_tungstenite::tungstenite::Message; +use super::exec_server_test_support::accept_exec_server_environment; +use super::exec_server_test_support::read_exec_server_json; + const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10); const TEST_SERVER_NAME: &str = "tool_server"; const TEST_TOOL_NAME: &str = "echo_tool"; @@ -841,35 +840,11 @@ async fn serve_environment_until_shutdown( filesystem_request_tx: oneshot::Sender<()>, mut shutdown_rx: oneshot::Receiver<()>, ) -> Result<()> { - let (stream, _) = listener.accept().await?; - let mut websocket = accept_async(stream).await?; - - let initialize = read_exec_server_json(&mut websocket).await?; - assert_eq!(initialize["method"], "initialize"); - websocket - .send(Message::Text( - json!({ - "id": initialize["id"], - "result": {"sessionId": "test-session"}, - }) - .to_string() - .into(), - )) - .await?; - let initialized = read_exec_server_json(&mut websocket).await?; - assert_eq!(initialized["method"], "initialized"); - let environment_info = read_exec_server_json(&mut websocket).await?; - assert_eq!(environment_info["method"], "environment/info"); - websocket - .send(Message::Text( - json!({ - "id": environment_info["id"], - "result": {"shell": {"name": "zsh", "path": "/bin/zsh"}}, - }) - .to_string() - .into(), - )) - .await?; + let mut websocket = accept_exec_server_environment( + listener, + json!({"shell": {"name": "zsh", "path": "/bin/zsh"}}), + ) + .await?; let mut filesystem_request_tx = Some(filesystem_request_tx); loop { @@ -899,23 +874,6 @@ async fn serve_environment_until_shutdown( } } -async fn read_exec_server_json( - websocket: &mut WebSocketStream, -) -> Result { - loop { - match websocket - .next() - .await - .ok_or_else(|| anyhow::anyhow!("exec-server websocket closed"))?? - { - Message::Text(text) => return Ok(serde_json::from_str(text.as_ref())?), - Message::Binary(bytes) => return Ok(serde_json::from_slice(bytes.as_ref())?), - Message::Ping(_) | Message::Pong(_) => {} - message => anyhow::bail!("expected JSON-RPC message, got {message:?}"), - } - } -} - async fn wait_for_mcp_tool_call_completed( mcp: &mut TestAppServer, call_id: &str, diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 9963d37be..bdb03271d 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -15,6 +15,8 @@ mod connection_handling_websocket_unix; mod current_time; mod dynamic_tools; mod environment_add; +mod environment_info; +mod exec_server_test_support; #[cfg(not(target_os = "windows"))] mod executor_mcp; mod executor_skills; diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index e5f813cc4..82412a58e 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -110,6 +110,7 @@ mod recovery; const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10); +const ENVIRONMENT_INFO_TIMEOUT: Duration = Duration::from_secs(30); const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256; const PROCESS_EVENT_RETAINED_BYTES: usize = 1024 * 1024; @@ -510,7 +511,12 @@ impl ExecServerClient { } pub async fn environment_info(&self) -> Result { - self.call(ENVIRONMENT_INFO_METHOD, &()).await + let rpc_client = self.inner.rpc_client().await?; + map_rpc_call_result( + rpc_client + .call_with_timeout(ENVIRONMENT_INFO_METHOD, &(), ENVIRONMENT_INFO_TIMEOUT) + .await, + ) } pub async fn read(&self, params: ReadParams) -> Result { @@ -780,22 +786,21 @@ impl ExecServerClient { P: serde::Serialize, T: serde::de::DeserializeOwned, { - match rpc_client.call(method, params).await { - Ok(response) => Ok(response), - Err(error) => { - let error = ExecServerError::from(error); - if is_transport_closed_error(&error) { - Err(ExecServerError::Disconnected(disconnected_message( - /*reason*/ None, - ))) - } else { - Err(error) - } - } - } + map_rpc_call_result(rpc_client.call(method, params).await) } } +fn map_rpc_call_result(result: Result) -> Result { + result.map_err(|error| { + let error = ExecServerError::from(error); + if is_transport_closed_error(&error) { + ExecServerError::Disconnected(disconnected_message(/*reason*/ None)) + } else { + error + } + }) +} + async fn cleanup_process_start( client: &ExecServerClient, process_id: &ProcessId, @@ -822,6 +827,9 @@ impl From for ExecServerError { code: error.code, message: error.message, }, + RpcCallError::TimedOut { method, timeout } => Self::Protocol(format!( + "timed out waiting for exec-server `{method}` response after {timeout:?}" + )), } } } diff --git a/codex-rs/exec-server/src/rpc.rs b/codex-rs/exec-server/src/rpc.rs index a941dce41..46a0bd14e 100644 --- a/codex-rs/exec-server/src/rpc.rs +++ b/codex-rs/exec-server/src/rpc.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; +use std::time::Duration; use codex_exec_server_protocol::JSONRPCError; use codex_exec_server_protocol::JSONRPCErrorError; @@ -21,6 +22,7 @@ use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::watch; use tokio::task::JoinHandle; +use tokio::time::timeout; use crate::connection::JsonRpcConnection; use crate::connection::JsonRpcConnectionEvent; @@ -36,6 +38,8 @@ pub(crate) enum RpcCallError { Json(serde_json::Error), /// The executor returned a JSON-RPC error response for this call. Server(JSONRPCErrorError), + /// The executor did not return a response before the caller's deadline. + TimedOut { method: String, timeout: Duration }, } type PendingRequest = oneshot::Sender>; @@ -46,6 +50,11 @@ type RequestRoute = Box< type NotificationRoute = Box, JSONRPCNotification) -> BoxFuture> + Send + Sync>; +enum RpcCallTimeout { + None, + After(Duration), +} + #[derive(Debug)] pub(crate) enum RpcClientEvent { Notification(JSONRPCNotification), @@ -334,6 +343,33 @@ impl RpcClient { } pub(crate) async fn call(&self, method: &str, params: &P) -> Result + where + P: Serialize, + T: DeserializeOwned, + { + self.call_inner(method, params, RpcCallTimeout::None).await + } + + pub(crate) async fn call_with_timeout( + &self, + method: &str, + params: &P, + call_timeout: Duration, + ) -> Result + where + P: Serialize, + T: DeserializeOwned, + { + self.call_inner(method, params, RpcCallTimeout::After(call_timeout)) + .await + } + + async fn call_inner( + &self, + method: &str, + params: &P, + call_timeout: RpcCallTimeout, + ) -> Result where P: Serialize, T: DeserializeOwned, @@ -379,8 +415,20 @@ impl RpcClient { // still-pending requests. Awaiting this receiver preserves that order: // responses already read before EOF still win, and truly pending calls // are failed once the reader observes the disconnect. - let result: Result = - response_rx.await.map_err(|_| RpcCallError::Closed)?; + let response = match call_timeout { + RpcCallTimeout::None => response_rx.await, + RpcCallTimeout::After(call_timeout) => match timeout(call_timeout, response_rx).await { + Ok(response) => response, + Err(_) => { + self.pending.lock().await.remove(&request_id); + return Err(RpcCallError::TimedOut { + method: method.to_string(), + timeout: call_timeout, + }); + } + }, + }; + let result: Result = response.map_err(|_| RpcCallError::Closed)?; let response = match result { Ok(response) => response, Err(error) => return Err(error), @@ -673,6 +721,40 @@ mod tests { } } + #[tokio::test] + async fn rpc_client_timeout_removes_pending_request() { + let (client_stdin, server_reader) = tokio::io::duplex(4096); + let (server_writer, client_stdout) = tokio::io::duplex(4096); + let (release_server_tx, release_server_rx) = tokio::sync::oneshot::channel(); + let connection = + JsonRpcConnection::from_stdio(client_stdout, client_stdin, "test-rpc".to_string()); + let (client, _events_rx) = RpcClient::new(connection); + + let server = tokio::spawn(async move { + let mut lines = BufReader::new(server_reader).lines(); + let request = read_jsonrpc_line(&mut lines).await; + assert!(matches!(request, JSONRPCMessage::Request(_))); + let _server_writer = server_writer; + let _ = release_server_rx.await; + }); + + let call_timeout = Duration::from_millis(10); + let result = client + .call_with_timeout::<_, serde_json::Value>("slow", &serde_json::json!({}), call_timeout) + .await; + assert!(matches!( + result, + Err(super::RpcCallError::TimedOut { method, timeout }) + if method == "slow" && timeout == call_timeout + )); + assert_eq!(client.pending_request_count().await, 0); + + let _ = release_server_tx.send(()); + if let Err(err) = server.await { + panic!("server task failed: {err}"); + } + } + #[tokio::test(flavor = "current_thread")] async fn rpc_client_propagates_current_trace_context() { let span_exporter = InMemorySpanExporter::default();