mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
74dcce594d
## Why Exec-server JSON-RPC calls can cross local and remote transports, but trace context stopped at the RPC boundary. That made client and server work difficult to correlate when diagnosing latency or failures. ## What changed - Propagate the current W3C trace context on outbound JSON-RPC requests. - Parent inbound request spans from received trace context. - Record the received JSON-RPC method on server spans and keep each span open through response enqueue. - Add only the OTEL dependencies required by the exec-server crate. ## Stack Review and land this stack in order: 1. #27466 — trace exec-server JSON-RPC requests **(this PR)** 2. #27467 — record bounded connection, request, and process lifecycle metrics 3. #27470 — observe remote registration and Noise rendezvous lifecycle ## Validation - `just test -p codex-exec-server --lib` (153 passed) - `just bazel-lock-check` - `just fix -p codex-exec-server`
126 lines
4.4 KiB
Rust
126 lines
4.4 KiB
Rust
#![cfg(unix)]
|
|
|
|
mod common;
|
|
|
|
use codex_exec_server::InitializeParams;
|
|
use codex_exec_server::InitializeResponse;
|
|
use codex_exec_server_protocol::JSONRPCError;
|
|
use codex_exec_server_protocol::JSONRPCMessage;
|
|
use codex_exec_server_protocol::JSONRPCResponse;
|
|
use common::exec_server::exec_server;
|
|
use pretty_assertions::assert_eq;
|
|
use tokio_tungstenite::connect_async;
|
|
use tokio_tungstenite::tungstenite::Error as WebSocketError;
|
|
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
|
use tokio_tungstenite::tungstenite::http::HeaderValue;
|
|
use tokio_tungstenite::tungstenite::http::StatusCode;
|
|
use tokio_tungstenite::tungstenite::http::header::ORIGIN;
|
|
use uuid::Uuid;
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> anyhow::Result<()> {
|
|
let mut server = exec_server().await?;
|
|
server.send_raw_text("not-json").await?;
|
|
|
|
let response = server
|
|
.wait_for_event(|event| matches!(event, JSONRPCMessage::Error(_)))
|
|
.await?;
|
|
let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else {
|
|
panic!("expected malformed-message error response");
|
|
};
|
|
assert_eq!(id, codex_exec_server_protocol::RequestId::Integer(-1));
|
|
assert_eq!(error.code, -32600);
|
|
assert!(
|
|
error
|
|
.message
|
|
.starts_with("failed to parse websocket JSON-RPC message from exec-server websocket"),
|
|
"unexpected malformed-message error: {}",
|
|
error.message
|
|
);
|
|
|
|
let initialize_id = server
|
|
.send_request(
|
|
"initialize",
|
|
serde_json::to_value(InitializeParams {
|
|
client_name: "exec-server-test".to_string(),
|
|
resume_session_id: None,
|
|
})?,
|
|
)
|
|
.await?;
|
|
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else {
|
|
panic!("expected initialize response after malformed input");
|
|
};
|
|
assert_eq!(id, initialize_id);
|
|
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
|
|
Uuid::parse_str(&initialize_response.session_id)?;
|
|
|
|
server.shutdown().await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn exec_server_accepts_binary_websocket_json() -> anyhow::Result<()> {
|
|
let mut server = exec_server().await?;
|
|
let initialize_id = codex_exec_server_protocol::RequestId::Integer(1);
|
|
let initialize = JSONRPCMessage::Request(codex_exec_server_protocol::JSONRPCRequest {
|
|
id: initialize_id.clone(),
|
|
method: "initialize".to_string(),
|
|
params: Some(serde_json::to_value(InitializeParams {
|
|
client_name: "exec-server-binary-test".to_string(),
|
|
resume_session_id: None,
|
|
})?),
|
|
trace: None,
|
|
});
|
|
server
|
|
.send_raw_binary(serde_json::to_vec(&initialize)?)
|
|
.await?;
|
|
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else {
|
|
panic!("expected initialize response for binary input");
|
|
};
|
|
assert_eq!(id, initialize_id);
|
|
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
|
|
Uuid::parse_str(&initialize_response.session_id)?;
|
|
|
|
server.shutdown().await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn exec_server_rejects_browser_origin_websocket_handshake() -> anyhow::Result<()> {
|
|
let mut server = exec_server().await?;
|
|
let mut request = server.websocket_url().into_client_request()?;
|
|
request
|
|
.headers_mut()
|
|
.insert(ORIGIN, HeaderValue::from_static("https://evil.example"));
|
|
|
|
let error = match connect_async(request).await {
|
|
Ok(_) => anyhow::bail!("browser-origin websocket handshake should be rejected"),
|
|
Err(error) => error,
|
|
};
|
|
let WebSocketError::Http(response) = error else {
|
|
anyhow::bail!("browser-origin websocket handshake failed unexpectedly: {error}");
|
|
};
|
|
assert_eq!(response.status(), StatusCode::FORBIDDEN);
|
|
|
|
server.shutdown().await?;
|
|
Ok(())
|
|
}
|