[app-server] expose environment info RPC (#30291)

## Why

App-server clients that configure named execution environments need to
discover an environment's shell and working directory before selecting
it for a thread or turn. Because the environment can run on a different
operating system than app-server, its working directory is represented
as a canonical `file:` URI rather than a host-local path string. The
probe also needs a bounded response time: an exec-server that completes
initialization but never answers `environment/info` must not hold the
environment serialization queue indefinitely.

## What changed

- Add an experimental `environment/info` app-server RPC for named
environments.
- Route the probe through the managed environment connection and return
target-native shell metadata plus the default working directory as a
`PathUri`.
- Return connection and protocol failures as JSON-RPC errors.
- Bound the exec-server probe response to 30 seconds and remove
timed-out calls from the pending-request table so later environment
mutations can proceed.
- Cover successful responses, omitted working directories, unknown
environments, connection failures, and pending-call cleanup.

## Protocol examples

Request:

```json
{
  "id": 42,
  "method": "environment/info",
  "params": {
    "environmentId": "remote-a"
  }
}
```

Successful response:

```json
{
  "id": 42,
  "result": {
    "shell": {
      "name": "zsh",
      "path": "/bin/zsh"
    },
    "cwd": "file:///workspace"
  }
}
```

If the exec-server initializes but does not answer the probe within 30
seconds:

```json
{
  "id": 42,
  "error": {
    "code": -32603,
    "message": "failed to get info for environment `remote-a`: exec-server protocol error: timed out waiting for exec-server `environment/info` response after 30s"
  }
}
```

## Testing

- App-server integration coverage for successful info (including omitted
`cwd`), unknown environments, and connection failures.
- Exec-server RPC coverage verifying a timed-out call is removed from
the pending-request table.

---------

Co-authored-by: Michael Bolin <mbolin@openai.com>
This commit is contained in:
Max Johnson
2026-06-27 12:34:10 -07:00
committed by GitHub
Unverified
parent d2885dc3cd
commit e2398d0b16
13 changed files with 466 additions and 66 deletions
@@ -43,6 +43,8 @@ pub(crate) const GENERATED_TS_HEADER: &str = "// GENERATED CODE! DO NOT MODIFY B
const IGNORED_DEFINITIONS: &[&str] = &["Option<()>"];
const JSON_V1_ALLOWLIST: &[&str] = &["InitializeParams", "InitializeResponse"];
const EXPERIMENTAL_CLIENT_METHOD_DEPENDENCY_TYPES: &[&str] = &[
"EnvironmentShellInfo",
"PathUri",
"RemoteControlClient",
"RemoteControlClientsListOrder",
"ThreadBackgroundTerminal",
@@ -943,6 +943,13 @@ client_request_definitions! {
serialization: global("environment"),
response: v2::EnvironmentAddResponse,
},
#[experimental("environment/info")]
/// Reads information from a configured execution environment.
EnvironmentInfo => "environment/info" {
params: v2::EnvironmentInfoParams,
serialization: global_shared_read("environment"),
response: v2::EnvironmentInfoResponse,
},
McpServerOauthLogin => "mcpServer/oauth/login" {
params: v2::McpServerOauthLoginParams,
@@ -1,3 +1,4 @@
use codex_utils_path_uri::PathUri;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
@@ -19,3 +20,29 @@ pub struct EnvironmentAddParams {
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct EnvironmentAddResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct EnvironmentInfoParams {
pub environment_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct EnvironmentInfoResponse {
pub shell: EnvironmentShellInfo,
/// Default working directory reported by the environment, as a canonical file URI.
pub cwd: Option<PathUri>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct EnvironmentShellInfo {
/// Stable shell name, for example `zsh`, `bash`, `powershell`, `sh`, or `cmd`.
pub name: String,
/// Target-native shell executable path or command name.
pub path: String,
}
+1
View File
@@ -205,6 +205,7 @@ Example with notification opt-out:
- `permissionProfile/list` — beta; list available permission profile ids with optional display `description` text and an `allowed` flag reflecting effective requirements, using cursor pagination. Pass `cwd` when the caller needs project-local `[permissions.<id>]` entries to be included in the current catalog view.
- `experimentalFeature/enablement/set` — patch the in-memory process-wide runtime feature enablement for currently supported feature keys. For each feature, precedence is: cloud requirements > --enable <feature_name> > config.toml > experimentalFeature/enablement/set (new) > code default. Invalid keys will be ignored.
- `environment/add` — experimental; add or replace a named remote environment by `environmentId` and `execServerUrl` for later selection by `thread/start` or `turn/start`; optional `connectTimeoutMs` overrides the WebSocket connection timeout; returns `{}` and does not change the default environment.
- `environment/info` — experimental; connect to a configured environment by `environmentId` and return its detected `shell` plus its default `cwd` as a canonical environment-native `file:` URI. Connection failures are returned as request errors.
- `collaborationMode/list` — list available collaboration mode presets (experimental, no pagination). Built-in presets do not select a model; the Plan preset selects medium reasoning effort. This response omits built-in developer instructions; clients should either pass `settings.developer_instructions: null` when setting a mode to use Codex's built-in instructions, or provide their own instructions explicitly.
- `skills/list` — list skills for one or more `cwd` values (optional `forceReload`).
- `skills/extraRoots/set` — replace the app-server process runtime extra standalone skill roots. The roots are not persisted; missing directories are accepted and simply load no skills.
@@ -1037,6 +1037,9 @@ impl MessageProcessor {
ClientRequest::EnvironmentAdd { params, .. } => {
self.environment_processor.environment_add(params).await
}
ClientRequest::EnvironmentInfo { params, .. } => {
self.environment_processor.environment_info(params).await
}
ClientRequest::FsReadFile { params, .. } => self
.fs_processor
.read_file(params)
@@ -62,6 +62,9 @@ use codex_app_server_protocol::DynamicToolNamespaceTool;
use codex_app_server_protocol::DynamicToolSpec;
use codex_app_server_protocol::EnvironmentAddParams;
use codex_app_server_protocol::EnvironmentAddResponse;
use codex_app_server_protocol::EnvironmentInfoParams;
use codex_app_server_protocol::EnvironmentInfoResponse;
use codex_app_server_protocol::EnvironmentShellInfo;
use codex_app_server_protocol::ExperimentalFeature as ApiExperimentalFeature;
use codex_app_server_protocol::ExperimentalFeatureListParams;
use codex_app_server_protocol::ExperimentalFeatureListResponse;
@@ -26,4 +26,30 @@ impl EnvironmentRequestProcessor {
.map_err(|err| invalid_request(err.to_string()))?;
Ok(Some(EnvironmentAddResponse {}.into()))
}
pub(crate) async fn environment_info(
&self,
params: EnvironmentInfoParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
let environment_id = params.environment_id;
let environment = self
.environment_manager
.get_environment(&environment_id)
.ok_or_else(|| invalid_request(format!("unknown environment id `{environment_id}`")))?;
let info = environment.info().await.map_err(|err| {
internal_error(format!(
"failed to get info for environment `{environment_id}`: {err}"
))
})?;
Ok(Some(
EnvironmentInfoResponse {
shell: EnvironmentShellInfo {
name: info.shell.name,
path: info.shell.path,
},
cwd: info.cwd,
}
.into(),
))
}
}
@@ -0,0 +1,208 @@
use std::time::Duration;
use anyhow::Result;
use app_test_support::TestAppServer;
use app_test_support::to_response;
use codex_app_server_protocol::EnvironmentAddResponse;
use codex_app_server_protocol::EnvironmentInfoResponse;
use codex_app_server_protocol::EnvironmentShellInfo;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_utils_path_uri::PathUri;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::time::timeout;
use super::exec_server_test_support::accept_exec_server_environment;
const RPC_TIMEOUT: Duration = Duration::from_secs(10);
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
const INTERNAL_ERROR_CODE: i64 = -32603;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn environment_info_returns_remote_environment_info() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let exec_server_url = format!("ws://{}", listener.local_addr()?);
let exec_server = tokio::spawn(async move {
accept_exec_server_environment(
listener,
json!({
"shell": {"name": "zsh", "path": "/bin/zsh"},
"cwd": "file:///workspace",
}),
)
.await?;
Ok::<_, anyhow::Error>(())
});
let codex_home = TempDir::new()?;
let mut app_server = TestAppServer::new(codex_home.path()).await?;
timeout(RPC_TIMEOUT, app_server.initialize()).await??;
add_environment(
&mut app_server,
&exec_server_url,
/*connect_timeout_ms*/ None,
)
.await?;
let request_id = app_server
.send_raw_request(
"environment/info",
Some(json!({"environmentId": "remote-a"})),
)
.await?;
let response: JSONRPCResponse = timeout(
RPC_TIMEOUT,
app_server.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(
to_response::<EnvironmentInfoResponse>(response)?,
EnvironmentInfoResponse {
shell: EnvironmentShellInfo {
name: "zsh".to_string(),
path: "/bin/zsh".to_string(),
},
cwd: Some(PathUri::parse("file:///workspace")?),
}
);
timeout(RPC_TIMEOUT, exec_server).await???;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn environment_info_accepts_missing_cwd() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let exec_server_url = format!("ws://{}", listener.local_addr()?);
let exec_server = tokio::spawn(async move {
accept_exec_server_environment(
listener,
json!({"shell": {"name": "zsh", "path": "/bin/zsh"}}),
)
.await?;
Ok::<_, anyhow::Error>(())
});
let codex_home = TempDir::new()?;
let mut app_server = TestAppServer::new(codex_home.path()).await?;
timeout(RPC_TIMEOUT, app_server.initialize()).await??;
add_environment(
&mut app_server,
&exec_server_url,
/*connect_timeout_ms*/ None,
)
.await?;
let request_id = app_server
.send_raw_request(
"environment/info",
Some(json!({"environmentId": "remote-a"})),
)
.await?;
let response: JSONRPCResponse = timeout(
RPC_TIMEOUT,
app_server.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(
to_response::<EnvironmentInfoResponse>(response)?,
EnvironmentInfoResponse {
shell: EnvironmentShellInfo {
name: "zsh".to_string(),
path: "/bin/zsh".to_string(),
},
cwd: None,
}
);
timeout(RPC_TIMEOUT, exec_server).await???;
Ok(())
}
#[tokio::test]
async fn environment_info_rejects_unknown_environment() -> Result<()> {
let codex_home = TempDir::new()?;
let mut app_server = TestAppServer::new(codex_home.path()).await?;
timeout(RPC_TIMEOUT, app_server.initialize()).await??;
let request_id = app_server
.send_raw_request(
"environment/info",
Some(json!({"environmentId": "missing"})),
)
.await?;
let error = timeout(
RPC_TIMEOUT,
app_server.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(
error,
JSONRPCError {
id: RequestId::Integer(request_id),
error: JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "unknown environment id `missing`".to_string(),
data: None,
},
}
);
Ok(())
}
#[tokio::test]
async fn environment_info_reports_connection_failure() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let exec_server_url = format!("ws://{}", listener.local_addr()?);
let codex_home = TempDir::new()?;
let mut app_server = TestAppServer::new(codex_home.path()).await?;
timeout(RPC_TIMEOUT, app_server.initialize()).await??;
add_environment(&mut app_server, &exec_server_url, Some(50)).await?;
let request_id = app_server
.send_raw_request(
"environment/info",
Some(json!({"environmentId": "remote-a"})),
)
.await?;
let error = timeout(
RPC_TIMEOUT,
app_server.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(error.error.code, INTERNAL_ERROR_CODE);
assert!(
error
.error
.message
.contains("failed to get info for environment `remote-a`")
);
Ok(())
}
async fn add_environment(
app_server: &mut TestAppServer,
exec_server_url: &str,
connect_timeout_ms: Option<u64>,
) -> Result<()> {
let request_id = app_server
.send_raw_request(
"environment/add",
Some(json!({
"environmentId": "remote-a",
"execServerUrl": exec_server_url,
"connectTimeoutMs": connect_timeout_ms,
})),
)
.await?;
let response: JSONRPCResponse = timeout(
RPC_TIMEOUT,
app_server.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let _: EnvironmentAddResponse = to_response(response)?;
Ok(())
}
@@ -0,0 +1,73 @@
use anyhow::Result;
use futures::SinkExt;
use futures::StreamExt;
use serde_json::Value;
use serde_json::json;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;
pub(crate) async fn accept_exec_server_environment(
listener: TcpListener,
environment_info: Value,
) -> Result<WebSocketStream<TcpStream>> {
let mut websocket = accept_initialized_exec_server(listener).await?;
let request = read_exec_server_json(&mut websocket).await?;
assert_eq!(request["method"], "environment/info");
websocket
.send(Message::Text(
json!({
"id": request["id"],
"result": environment_info,
})
.to_string()
.into(),
))
.await?;
Ok(websocket)
}
pub(crate) async fn accept_initialized_exec_server(
listener: TcpListener,
) -> Result<WebSocketStream<TcpStream>> {
let (stream, _) = listener.accept().await?;
let mut websocket = accept_async(stream).await?;
let initialize = read_exec_server_json(&mut websocket).await?;
assert_eq!(initialize["method"], "initialize");
websocket
.send(Message::Text(
json!({
"id": initialize["id"],
"result": {"sessionId": "test-session"},
})
.to_string()
.into(),
))
.await?;
let initialized = read_exec_server_json(&mut websocket).await?;
assert_eq!(initialized["method"], "initialized");
Ok(websocket)
}
pub(crate) async fn read_exec_server_json(
websocket: &mut WebSocketStream<TcpStream>,
) -> Result<Value> {
loop {
match websocket
.next()
.await
.ok_or_else(|| anyhow::anyhow!("exec-server websocket closed"))??
{
Message::Text(text) => return Ok(serde_json::from_str(text.as_ref())?),
Message::Binary(bytes) => return Ok(serde_json::from_slice(bytes.as_ref())?),
Message::Ping(_) | Message::Pong(_) => {}
message => anyhow::bail!("expected JSON-RPC message, got {message:?}"),
}
}
}
+8 -50
View File
@@ -38,7 +38,6 @@ use codex_utils_path_uri::PathUri;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use core_test_support::responses;
use futures::SinkExt;
use futures::StreamExt;
use pretty_assertions::assert_eq;
use rmcp::handler::server::ServerHandler;
use rmcp::model::BooleanSchema;
@@ -64,14 +63,14 @@ use rmcp::transport::streamable_http_server::session::local::LocalSessionManager
use serde_json::json;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;
use super::exec_server_test_support::accept_exec_server_environment;
use super::exec_server_test_support::read_exec_server_json;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
const TEST_SERVER_NAME: &str = "tool_server";
const TEST_TOOL_NAME: &str = "echo_tool";
@@ -841,35 +840,11 @@ async fn serve_environment_until_shutdown(
filesystem_request_tx: oneshot::Sender<()>,
mut shutdown_rx: oneshot::Receiver<()>,
) -> Result<()> {
let (stream, _) = listener.accept().await?;
let mut websocket = accept_async(stream).await?;
let initialize = read_exec_server_json(&mut websocket).await?;
assert_eq!(initialize["method"], "initialize");
websocket
.send(Message::Text(
json!({
"id": initialize["id"],
"result": {"sessionId": "test-session"},
})
.to_string()
.into(),
))
.await?;
let initialized = read_exec_server_json(&mut websocket).await?;
assert_eq!(initialized["method"], "initialized");
let environment_info = read_exec_server_json(&mut websocket).await?;
assert_eq!(environment_info["method"], "environment/info");
websocket
.send(Message::Text(
json!({
"id": environment_info["id"],
"result": {"shell": {"name": "zsh", "path": "/bin/zsh"}},
})
.to_string()
.into(),
))
.await?;
let mut websocket = accept_exec_server_environment(
listener,
json!({"shell": {"name": "zsh", "path": "/bin/zsh"}}),
)
.await?;
let mut filesystem_request_tx = Some(filesystem_request_tx);
loop {
@@ -899,23 +874,6 @@ async fn serve_environment_until_shutdown(
}
}
async fn read_exec_server_json(
websocket: &mut WebSocketStream<TcpStream>,
) -> Result<serde_json::Value> {
loop {
match websocket
.next()
.await
.ok_or_else(|| anyhow::anyhow!("exec-server websocket closed"))??
{
Message::Text(text) => return Ok(serde_json::from_str(text.as_ref())?),
Message::Binary(bytes) => return Ok(serde_json::from_slice(bytes.as_ref())?),
Message::Ping(_) | Message::Pong(_) => {}
message => anyhow::bail!("expected JSON-RPC message, got {message:?}"),
}
}
}
async fn wait_for_mcp_tool_call_completed(
mcp: &mut TestAppServer,
call_id: &str,
@@ -15,6 +15,8 @@ mod connection_handling_websocket_unix;
mod current_time;
mod dynamic_tools;
mod environment_add;
mod environment_info;
mod exec_server_test_support;
#[cfg(not(target_os = "windows"))]
mod executor_mcp;
mod executor_skills;
+22 -14
View File
@@ -110,6 +110,7 @@ mod recovery;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
const ENVIRONMENT_INFO_TIMEOUT: Duration = Duration::from_secs(30);
const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256;
const PROCESS_EVENT_RETAINED_BYTES: usize = 1024 * 1024;
@@ -510,7 +511,12 @@ impl ExecServerClient {
}
pub async fn environment_info(&self) -> Result<EnvironmentInfo, ExecServerError> {
self.call(ENVIRONMENT_INFO_METHOD, &()).await
let rpc_client = self.inner.rpc_client().await?;
map_rpc_call_result(
rpc_client
.call_with_timeout(ENVIRONMENT_INFO_METHOD, &(), ENVIRONMENT_INFO_TIMEOUT)
.await,
)
}
pub async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
@@ -780,22 +786,21 @@ impl ExecServerClient {
P: serde::Serialize,
T: serde::de::DeserializeOwned,
{
match rpc_client.call(method, params).await {
Ok(response) => Ok(response),
Err(error) => {
let error = ExecServerError::from(error);
if is_transport_closed_error(&error) {
Err(ExecServerError::Disconnected(disconnected_message(
/*reason*/ None,
)))
} else {
Err(error)
}
}
}
map_rpc_call_result(rpc_client.call(method, params).await)
}
}
fn map_rpc_call_result<T>(result: Result<T, RpcCallError>) -> Result<T, ExecServerError> {
result.map_err(|error| {
let error = ExecServerError::from(error);
if is_transport_closed_error(&error) {
ExecServerError::Disconnected(disconnected_message(/*reason*/ None))
} else {
error
}
})
}
async fn cleanup_process_start(
client: &ExecServerClient,
process_id: &ProcessId,
@@ -822,6 +827,9 @@ impl From<RpcCallError> for ExecServerError {
code: error.code,
message: error.message,
},
RpcCallError::TimedOut { method, timeout } => Self::Protocol(format!(
"timed out waiting for exec-server `{method}` response after {timeout:?}"
)),
}
}
}
+84 -2
View File
@@ -5,6 +5,7 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_exec_server_protocol::JSONRPCError;
use codex_exec_server_protocol::JSONRPCErrorError;
@@ -21,6 +22,7 @@ use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
@@ -36,6 +38,8 @@ pub(crate) enum RpcCallError {
Json(serde_json::Error),
/// The executor returned a JSON-RPC error response for this call.
Server(JSONRPCErrorError),
/// The executor did not return a response before the caller's deadline.
TimedOut { method: String, timeout: Duration },
}
type PendingRequest = oneshot::Sender<Result<Value, RpcCallError>>;
@@ -46,6 +50,11 @@ type RequestRoute<S> = Box<
type NotificationRoute<S> =
Box<dyn Fn(Arc<S>, JSONRPCNotification) -> BoxFuture<Result<(), String>> + Send + Sync>;
enum RpcCallTimeout {
None,
After(Duration),
}
#[derive(Debug)]
pub(crate) enum RpcClientEvent {
Notification(JSONRPCNotification),
@@ -334,6 +343,33 @@ impl RpcClient {
}
pub(crate) async fn call<P, T>(&self, method: &str, params: &P) -> Result<T, RpcCallError>
where
P: Serialize,
T: DeserializeOwned,
{
self.call_inner(method, params, RpcCallTimeout::None).await
}
pub(crate) async fn call_with_timeout<P, T>(
&self,
method: &str,
params: &P,
call_timeout: Duration,
) -> Result<T, RpcCallError>
where
P: Serialize,
T: DeserializeOwned,
{
self.call_inner(method, params, RpcCallTimeout::After(call_timeout))
.await
}
async fn call_inner<P, T>(
&self,
method: &str,
params: &P,
call_timeout: RpcCallTimeout,
) -> Result<T, RpcCallError>
where
P: Serialize,
T: DeserializeOwned,
@@ -379,8 +415,20 @@ impl RpcClient {
// still-pending requests. Awaiting this receiver preserves that order:
// responses already read before EOF still win, and truly pending calls
// are failed once the reader observes the disconnect.
let result: Result<Value, RpcCallError> =
response_rx.await.map_err(|_| RpcCallError::Closed)?;
let response = match call_timeout {
RpcCallTimeout::None => response_rx.await,
RpcCallTimeout::After(call_timeout) => match timeout(call_timeout, response_rx).await {
Ok(response) => response,
Err(_) => {
self.pending.lock().await.remove(&request_id);
return Err(RpcCallError::TimedOut {
method: method.to_string(),
timeout: call_timeout,
});
}
},
};
let result: Result<Value, RpcCallError> = response.map_err(|_| RpcCallError::Closed)?;
let response = match result {
Ok(response) => response,
Err(error) => return Err(error),
@@ -673,6 +721,40 @@ mod tests {
}
}
#[tokio::test]
async fn rpc_client_timeout_removes_pending_request() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (server_writer, client_stdout) = tokio::io::duplex(4096);
let (release_server_tx, release_server_rx) = tokio::sync::oneshot::channel();
let connection =
JsonRpcConnection::from_stdio(client_stdout, client_stdin, "test-rpc".to_string());
let (client, _events_rx) = RpcClient::new(connection);
let server = tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let request = read_jsonrpc_line(&mut lines).await;
assert!(matches!(request, JSONRPCMessage::Request(_)));
let _server_writer = server_writer;
let _ = release_server_rx.await;
});
let call_timeout = Duration::from_millis(10);
let result = client
.call_with_timeout::<_, serde_json::Value>("slow", &serde_json::json!({}), call_timeout)
.await;
assert!(matches!(
result,
Err(super::RpcCallError::TimedOut { method, timeout })
if method == "slow" && timeout == call_timeout
));
assert_eq!(client.pending_request_count().await, 0);
let _ = release_server_tx.send(());
if let Err(err) = server.await {
panic!("server task failed: {err}");
}
}
#[tokio::test(flavor = "current_thread")]
async fn rpc_client_propagates_current_trace_context() {
let span_exporter = InMemorySpanExporter::default();