[codex] Trace exec-server JSON-RPC requests (#27466)

## Why

Exec-server JSON-RPC calls can cross local and remote transports, but
trace context stopped at the RPC boundary. That made client and server
work difficult to correlate when diagnosing latency or failures.

## What changed

- Propagate the current W3C trace context on outbound JSON-RPC requests.
- Parent inbound request spans from received trace context.
- Record the received JSON-RPC method on server spans and keep each span
open through response enqueue.
- Add only the OTEL dependencies required by the exec-server crate.

## Stack

Review and land this stack in order:

1. #27466 — trace exec-server JSON-RPC requests **(this PR)**
2. #27467 — record bounded connection, request, and process lifecycle
metrics
3. #27470 — observe remote registration and Noise rendezvous lifecycle

## Validation

- `just test -p codex-exec-server --lib` (153 passed)
- `just bazel-lock-check`
- `just fix -p codex-exec-server`
This commit is contained in:
richardopenai
2026-06-24 12:50:18 -07:00
committed by GitHub
Unverified
parent 4907f0c2c3
commit 74dcce594d
12 changed files with 306 additions and 15 deletions
+5
View File
@@ -2888,6 +2888,7 @@ dependencies = [
"codex-exec-server-protocol",
"codex-file-system",
"codex-network-proxy",
"codex-otel",
"codex-protocol",
"codex-sandboxing",
"codex-test-binary-support",
@@ -2899,6 +2900,8 @@ dependencies = [
"futures",
"http 1.4.0",
"libc",
"opentelemetry",
"opentelemetry_sdk",
"pretty_assertions",
"prost 0.14.3",
"reqwest 0.12.28",
@@ -2913,6 +2916,8 @@ dependencies = [
"tokio-util",
"toml 0.9.11+spec-1.1.0",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"uuid",
"windows-sys 0.52.0",
"wiremock",
+3
View File
@@ -5,6 +5,7 @@
use std::fmt;
use codex_protocol::protocol::W3cTraceContext;
use serde::Deserialize;
use serde::Serialize;
@@ -45,6 +46,8 @@ pub struct JSONRPCRequest {
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub trace: Option<W3cTraceContext>,
}
/// A notification that does not expect a response.
+2
View File
@@ -4,6 +4,8 @@ codex_rust_crate(
name = "exec-server",
crate_name = "codex_exec_server",
deps_extra = [
"@crates//:opentelemetry",
"@crates//:opentelemetry_sdk",
"@crates//:toml",
],
extra_binaries = [
+5
View File
@@ -21,6 +21,7 @@ codex-client = { workspace = true }
codex-exec-server-protocol = { workspace = true }
codex-file-system = { workspace = true }
codex-network-proxy = { workspace = true }
codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-sandboxing = { workspace = true }
codex-utils-absolute-path = { workspace = true }
@@ -65,8 +66,12 @@ anyhow = { workspace = true }
codex-test-binary-support = { workspace = true }
ctor = { workspace = true }
http = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
pretty_assertions = { workspace = true }
serial_test = { workspace = true }
tempfile = { workspace = true }
test-case = "3.3.1"
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
wiremock = { workspace = true }
+113 -2
View File
@@ -20,6 +20,7 @@ use tokio::sync::watch;
use tokio_util::task::AbortOnDropHandle;
use tokio::time::timeout;
use tracing::Instrument;
use tracing::debug;
use crate::ProcessId;
@@ -659,7 +660,7 @@ impl ExecServerClient {
};
let client = self.clone();
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let process_start_task = async move {
let _active_start = active_start;
match client
.call_rpc::<_, ExecResponse>(&rpc_client, EXEC_METHOD, &params)
@@ -690,7 +691,8 @@ impl ExecServerClient {
let _ = result_tx.send(Err(error));
}
}
});
};
tokio::spawn(process_start_task.in_current_span());
return result_rx.await.map_err(|_| {
ExecServerError::Protocol("process start task stopped unexpectedly".to_string())
})?;
@@ -1197,8 +1199,11 @@ mod tests {
use codex_exec_server_protocol::JSONRPCMessage;
use codex_exec_server_protocol::JSONRPCNotification;
use codex_exec_server_protocol::JSONRPCResponse;
use codex_utils_path_uri::PathUri;
use futures::SinkExt;
use futures::StreamExt;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::SdkTracerProvider;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
#[cfg(unix)]
@@ -1223,6 +1228,9 @@ mod tests {
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;
use tracing::Instrument;
use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::prelude::*;
use super::ExecServerClient;
use super::ExecServerClientConnectOptions;
@@ -1238,6 +1246,7 @@ mod tests {
use crate::process::ExecProcessEvent;
use crate::protocol::EXEC_CLOSED_METHOD;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::EXEC_READ_METHOD;
use crate::protocol::EXEC_WRITE_METHOD;
@@ -1245,6 +1254,8 @@ mod tests {
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecOutputStream;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeResponse;
@@ -1277,6 +1288,106 @@ mod tests {
.expect("json-rpc line should write");
}
#[tokio::test(flavor = "current_thread")]
async fn process_start_propagates_caller_trace_context_across_background_task() {
let (client_stdin, server_reader) = duplex(1 << 20);
let (mut server_writer, client_stdout) = duplex(1 << 20);
let server = tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let initialize = match initialize {
JSONRPCMessage::Request(request) if request.method == INITIALIZE_METHOD => request,
other => panic!("expected initialize request, got {other:?}"),
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: initialize.id,
result: serde_json::to_value(InitializeResponse {
session_id: "trace-test".to_string(),
})
.expect("initialize response should serialize"),
}),
)
.await;
match read_jsonrpc_line(&mut lines).await {
JSONRPCMessage::Notification(notification)
if notification.method == INITIALIZED_METHOD => {}
other => panic!("expected initialized notification, got {other:?}"),
}
let request = match read_jsonrpc_line(&mut lines).await {
JSONRPCMessage::Request(request) if request.method == EXEC_METHOD => request,
other => panic!("expected process start request, got {other:?}"),
};
let trace = request.trace.clone();
let params: ExecParams =
serde_json::from_value(request.params.expect("process start params should exist"))
.expect("process start params should deserialize");
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: request.id,
result: serde_json::to_value(ExecResponse {
process_id: params.process_id,
})
.expect("process start response should serialize"),
}),
)
.await;
trace
});
let client = ExecServerClient::connect(
JsonRpcConnection::from_stdio(
client_stdout,
client_stdin,
"trace-test-client".to_string(),
),
ExecServerClientConnectOptions::default(),
)
.await
.expect("client should connect");
let tracer_provider = SdkTracerProvider::builder().build();
let tracer = tracer_provider.tracer("exec-server-test");
let subscriber = tracing_subscriber::registry().with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(codex_otel::OtelProvider::trace_export_filter)),
);
let _subscriber_guard = tracing::subscriber::set_default(subscriber);
tracing::callsite::rebuild_interest_cache();
let parent_span = tracing::info_span!("process-start-parent");
let expected_trace = codex_otel::span_w3c_trace_context(&parent_span)
.expect("parent span should have trace context");
let process_id = ProcessId::from("trace-process");
let session = client
.start_process(ExecParams {
process_id: process_id.clone(),
argv: vec!["true".to_string()],
cwd: PathUri::from_host_native_path(std::env::current_dir().expect("cwd"))
.expect("cwd URI"),
env_policy: None,
env: HashMap::new(),
tty: false,
pipe_stdin: false,
arg0: None,
sandbox: None,
enforce_managed_network: false,
managed_network: None,
})
.instrument(parent_span)
.await
.expect("process start should succeed");
assert_eq!(session.process_id(), &process_id);
let trace = server.await.expect("server task").expect("trace context");
assert_eq!(trace, expected_trace);
}
async fn accept_websocket(listener: &TcpListener) -> WebSocketStream<TcpStream> {
let (stream, _) = listener.accept().await.expect("listener should accept");
accept_async(stream)
+2
View File
@@ -667,6 +667,7 @@ mod tests {
id: RequestId::Integer(1),
method: "test".to_string(),
params: None,
trace: None,
});
server_websocket
@@ -731,6 +732,7 @@ mod tests {
id: RequestId::Integer(1),
method: "test".to_string(),
params: None,
trace: None,
})
}
+1
View File
@@ -1101,6 +1101,7 @@ mod tests {
id: RequestId::Integer(1),
method: "test".to_string(),
params: None,
trace: None,
})
}
+58
View File
@@ -362,6 +362,7 @@ impl RpcClient {
id: request_id.clone(),
method: method.to_string(),
params: Some(params),
trace: codex_otel::current_span_w3c_trace_context(),
}))
.await
.is_err()
@@ -553,11 +554,17 @@ mod tests {
use codex_exec_server_protocol::JSONRPCMessage;
use codex_exec_server_protocol::JSONRPCResponse;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::InMemorySpanExporter;
use opentelemetry_sdk::trace::SdkTracerProvider;
use pretty_assertions::assert_eq;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::time::timeout;
use tracing::Instrument;
use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::prelude::*;
use super::RpcClient;
use crate::connection::JsonRpcConnection;
@@ -663,4 +670,55 @@ mod tests {
panic!("server task failed: {err}");
}
}
#[tokio::test(flavor = "current_thread")]
async fn rpc_client_propagates_current_trace_context() {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter)
.build();
let tracer = tracer_provider.tracer("exec-server-test");
let subscriber = tracing_subscriber::registry().with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(codex_otel::OtelProvider::trace_export_filter)),
);
let _subscriber_guard = tracing::subscriber::set_default(subscriber);
tracing::callsite::rebuild_interest_cache();
let parent_span = tracing::info_span!("outbound-parent");
let expected_trace = codex_otel::span_w3c_trace_context(&parent_span)
.expect("parent span should have trace context");
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
let connection =
JsonRpcConnection::from_stdio(client_stdout, client_stdin, "test-rpc".to_string());
let (client, _events_rx) = RpcClient::new(connection);
let server = tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let request = match read_jsonrpc_line(&mut lines).await {
JSONRPCMessage::Request(request) => request,
other => panic!("expected JSON-RPC request, got {other:?}"),
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: request.id.clone(),
result: serde_json::json!({}),
}),
)
.await;
request.trace
});
let response = client
.call::<_, serde_json::Value>("traced", &serde_json::json!({}))
.instrument(parent_span)
.await
.expect("RPC response");
assert_eq!(response, serde_json::json!({}));
let trace = server.await.expect("server task").expect("trace context");
assert_eq!(trace, expected_trace);
}
}
+114 -13
View File
@@ -1,6 +1,7 @@
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::Instrument;
use tracing::debug;
use tracing::warn;
@@ -101,30 +102,40 @@ async fn run_connection(
JsonRpcConnectionEvent::Message(message) => match message {
codex_exec_server_protocol::JSONRPCMessage::Request(request) => {
if let Some(route) = router.request_route(request.method.as_str()) {
let request_span = request_span(request.method.as_str(), &request);
let message = tokio::select! {
message = route(Arc::clone(&handler), request) => message,
message = route(Arc::clone(&handler), request).instrument(request_span.clone()) => message,
_ = disconnected_rx.changed() => {
request_span.record("result", "disconnected");
debug!("exec-server transport disconnected while handling request");
break;
}
};
let result = request_result(&message);
if let Some(message) = message
&& outgoing_tx.send(message).await.is_err()
{
request_span.record("result", "disconnected");
break;
}
} else if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: request.id,
error: method_not_found(format!(
"exec-server stub does not implement `{}` yet",
request.method
)),
})
.await
.is_err()
{
break;
request_span.record("result", result);
} else {
let request_span = request_span("unknown", &request);
if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: request.id,
error: method_not_found(format!(
"exec-server stub does not implement `{}` yet",
request.method
)),
})
.await
.is_err()
{
request_span.record("result", "disconnected");
break;
}
request_span.record("result", "error");
}
}
codex_exec_server_protocol::JSONRPCMessage::Notification(notification) => {
@@ -184,6 +195,36 @@ async fn run_connection(
let _ = outbound_task.await;
}
fn request_span(
span_name: &str,
request: &codex_exec_server_protocol::JSONRPCRequest,
) -> tracing::Span {
let method = request.method.as_str();
let span = tracing::info_span!(
"codex.exec_server.request",
otel.kind = "server",
otel.name = span_name,
method,
result = tracing::field::Empty,
);
if let Some(trace) = &request.trace
&& !codex_otel::set_parent_from_w3c_trace_context(&span, trace)
{
warn!(method, "ignoring invalid inbound exec-server trace carrier");
}
span
}
fn request_result(message: &Option<RpcServerOutboundMessage>) -> &'static str {
match message {
Some(RpcServerOutboundMessage::Error { .. }) => "error",
Some(
RpcServerOutboundMessage::Response { .. } | RpcServerOutboundMessage::Notification(_),
)
| None => "success",
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
@@ -196,6 +237,11 @@ mod tests {
use codex_exec_server_protocol::JSONRPCResponse;
use codex_exec_server_protocol::RequestId;
use codex_utils_path_uri::PathUri;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::TraceId;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::InMemorySpanExporter;
use opentelemetry_sdk::trace::SdkTracerProvider;
use pretty_assertions::assert_eq;
use serde::Serialize;
use serde::de::DeserializeOwned;
@@ -207,7 +253,10 @@ mod tests {
use tokio::io::duplex;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::prelude::*;
use super::request_span;
use super::run_connection;
use crate::ExecServerRuntimePaths;
use crate::ProcessId;
@@ -228,6 +277,57 @@ mod tests {
use crate::protocol::TerminateResponse;
use crate::server::session_registry::SessionRegistry;
#[test]
fn request_span_uses_bounded_name_wire_method_and_inbound_trace_parent() {
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("exec-server-test");
let subscriber = tracing_subscriber::registry().with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(codex_otel::OtelProvider::trace_export_filter)),
);
let trace_id = TraceId::from_hex("00000000000000000000000000000001").expect("trace id");
let parent_span_id = SpanId::from_hex("0000000000000002").expect("span id");
let trace = codex_protocol::protocol::W3cTraceContext {
traceparent: Some(format!("00-{trace_id}-{parent_span_id}-01")),
tracestate: None,
};
let method = "custom/method";
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let request = JSONRPCRequest {
id: RequestId::Integer(1),
method: method.to_string(),
params: None,
trace: Some(trace),
};
let request_span = request_span("unknown", &request);
request_span.in_scope(|| {});
drop(request_span);
});
tracer_provider.force_flush().expect("flush traces");
let spans = span_exporter.get_finished_spans().expect("span export");
let request_span = spans
.iter()
.find(|span| span.name.as_ref() == "unknown")
.expect("unknown method span");
assert_eq!(
request_span
.attributes
.iter()
.find(|attribute| attribute.key.as_str() == "method")
.map(|attribute| attribute.value.clone()),
Some(opentelemetry::Value::String(method.into()))
);
assert_eq!(request_span.span_context.trace_id(), trace_id);
assert_eq!(request_span.parent_span_id, parent_span_id);
}
#[tokio::test]
async fn connection_accepts_pipelined_scalar_requests() {
let registry = SessionRegistry::new();
@@ -382,6 +482,7 @@ mod tests {
id: RequestId::Integer(id),
method: method.to_string(),
params: Some(serde_json::to_value(params).expect("serialize params")),
trace: None,
}),
)
.await;
@@ -74,6 +74,7 @@ async fn stdio_listen_transport_serves_initialize() {
})
.expect("initialize params should serialize"),
),
trace: None,
});
write_jsonrpc_line(&mut client_writer, &initialize).await;
@@ -165,6 +165,7 @@ impl ExecServerHarness {
id: id.clone(),
method: method.to_string(),
params: Some(params),
trace: None,
}))
.await?;
Ok(id)
+1
View File
@@ -78,6 +78,7 @@ async fn exec_server_accepts_binary_websocket_json() -> anyhow::Result<()> {
client_name: "exec-server-binary-test".to_string(),
resume_session_id: None,
})?),
trace: None,
});
server
.send_raw_binary(serde_json::to_vec(&initialize)?)