mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
[codex] Initialize exec-server OpenTelemetry at startup (#25019)
## Summary - Initialize stderr tracing and the configured OpenTelemetry provider for local and remote `codex exec-server` startup. - Instrument the local and remote server entrypoints with a root runtime span. - Keep raw Noise environment, registration, and stream identifiers out of exported spans while preserving them in local debug events. - Keep telemetry setup in a focused CLI module instead of growing the top-level command entrypoint. ## Stack - Previous: none (`#27058` has merged) - Next: #27466 ## Validation - `just test -p codex-exec-server --lib` (139 passed) - `just test -p codex-cli --test exec_server` (3 passed) - `just bazel-lock-check` - `just fix -p codex-exec-server -p codex-cli` - `just fmt` --------- Co-authored-by: Richard Lee <richardlee@openai.com>
This commit is contained in:
committed by
GitHub
Unverified
parent
e8dd1b45cb
commit
4c7228e423
@@ -0,0 +1,41 @@
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
const DEFAULT_ANALYTICS_ENABLED: bool = false;
|
||||
const DEFAULT_LOG_FILTER: &str = "error,opentelemetry_sdk=off,opentelemetry_otlp=off";
|
||||
const OTEL_SERVICE_NAME: &str = "codex-exec-server";
|
||||
|
||||
pub(crate) fn init(
|
||||
config: Option<&codex_core::config::Config>,
|
||||
) -> Result<impl Send + Sync, Box<dyn std::error::Error>> {
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_writer(std::io::stderr)
|
||||
.with_filter(stderr_env_filter());
|
||||
let otel = match config {
|
||||
Some(config) => codex_core::otel_init::build_provider(
|
||||
config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
Some(OTEL_SERVICE_NAME),
|
||||
DEFAULT_ANALYTICS_ENABLED,
|
||||
),
|
||||
None => Ok(None),
|
||||
};
|
||||
let provider = otel.as_ref().ok().and_then(Option::as_ref);
|
||||
codex_core::otel_init::record_process_start(provider, OTEL_SERVICE_NAME);
|
||||
|
||||
let otel_logger_layer = provider.and_then(|otel| otel.logger_layer());
|
||||
let otel_tracing_layer = provider.and_then(|otel| otel.tracing_layer());
|
||||
let _ = tracing_subscriber::registry()
|
||||
.with(fmt_layer)
|
||||
.with(otel_tracing_layer)
|
||||
.with(otel_logger_layer)
|
||||
.try_init();
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
otel
|
||||
}
|
||||
|
||||
fn stderr_env_filter() -> EnvFilter {
|
||||
EnvFilter::try_from_default_env()
|
||||
.or_else(|_| EnvFilter::try_new(DEFAULT_LOG_FILTER))
|
||||
.unwrap_or_else(|_| EnvFilter::new("error"))
|
||||
}
|
||||
@@ -49,6 +49,7 @@ mod app_cmd;
|
||||
#[cfg(any(target_os = "macos", target_os = "windows"))]
|
||||
mod desktop_app;
|
||||
mod doctor;
|
||||
mod exec_server_telemetry;
|
||||
mod marketplace_cmd;
|
||||
mod mcp_cmd;
|
||||
mod plugin_cmd;
|
||||
@@ -1692,6 +1693,9 @@ async fn run_exec_server_command(
|
||||
.environment_id
|
||||
.ok_or_else(|| anyhow::anyhow!("--environment-id is required when --remote is set"))?;
|
||||
let config = load_exec_server_config(root_config_overrides, strict_config).await?;
|
||||
let _otel = exec_server_telemetry::init(Some(&config))
|
||||
.inspect_err(|err| eprintln!("Could not create otel exporter: {err}"))
|
||||
.ok();
|
||||
let auth_provider =
|
||||
load_exec_server_remote_auth_provider(&config, &base_url, cmd.use_agent_identity_auth)
|
||||
.await?;
|
||||
@@ -1706,12 +1710,15 @@ async fn run_exec_server_command(
|
||||
codex_exec_server::run_remote_environment(remote_config, runtime_paths).await?;
|
||||
Ok(())
|
||||
} else {
|
||||
if strict_config {
|
||||
// Local exec-server startup does not consume Config, but strict
|
||||
// mode should still reject unknown fields before opening a listener.
|
||||
let _validated_config =
|
||||
load_exec_server_config(root_config_overrides, strict_config).await?;
|
||||
}
|
||||
let config_result = load_exec_server_config(root_config_overrides, strict_config).await;
|
||||
let config = if strict_config {
|
||||
Some(config_result?)
|
||||
} else {
|
||||
config_result.ok()
|
||||
};
|
||||
let _otel = exec_server_telemetry::init(config.as_ref())
|
||||
.inspect_err(|err| eprintln!("Could not create otel exporter: {err}"))
|
||||
.ok();
|
||||
let listen_url = cmd
|
||||
.listen
|
||||
.as_deref()
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Result;
|
||||
use predicates::prelude::PredicateBooleanExt;
|
||||
use predicates::str::contains;
|
||||
use tempfile::TempDir;
|
||||
|
||||
@@ -33,3 +34,17 @@ foo = "bar"
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_exec_server_ignores_invalid_config_without_strict_config() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
std::fs::write(codex_home.path().join("config.toml"), "not valid toml = [")?;
|
||||
|
||||
let mut cmd = codex_command(codex_home.path())?;
|
||||
cmd.args(["exec-server", "--listen", "stdio"])
|
||||
.assert()
|
||||
.success()
|
||||
.stderr(contains("not valid toml").not());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -86,12 +86,10 @@ where
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let (disconnected_tx, disconnected_rx) = watch::channel(false);
|
||||
let stream_span = tracing::debug_span!(
|
||||
"noise_relay.stream",
|
||||
noise_side = "harness",
|
||||
environment_id = %environment_id,
|
||||
executor_registration_id = %executor_registration_id,
|
||||
stream_id = %stream_id,
|
||||
let stream_span = tracing::debug_span!("noise_relay.stream", noise_side = "harness",);
|
||||
debug!(
|
||||
environment_id,
|
||||
executor_registration_id, stream_id, "Noise harness relay details"
|
||||
);
|
||||
|
||||
let websocket_task = tokio::spawn(async move {
|
||||
|
||||
@@ -440,15 +440,7 @@ pub(crate) trait HarnessKeyValidator: Send + Sync {
|
||||
///
|
||||
/// Parsing the first Noise message authenticates the harness key. Only a
|
||||
/// successful registry check turns that pending handshake into a virtual stream.
|
||||
#[tracing::instrument(
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(
|
||||
noise_side = "executor",
|
||||
environment_id = %environment_id,
|
||||
executor_registration_id = %executor_registration_id,
|
||||
)
|
||||
)]
|
||||
#[tracing::instrument(level = "debug", skip_all, fields(noise_side = "executor"))]
|
||||
pub(crate) async fn run_multiplexed_environment<S, V>(
|
||||
stream: WebSocketStream<S>,
|
||||
processor: ConnectionProcessor,
|
||||
@@ -460,6 +452,10 @@ pub(crate) async fn run_multiplexed_environment<S, V>(
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
V: HarnessKeyValidator + Clone + 'static,
|
||||
{
|
||||
debug!(
|
||||
environment_id,
|
||||
executor_registration_id, "Noise executor relay details"
|
||||
);
|
||||
let (mut websocket_sink, mut websocket_stream) = stream.split();
|
||||
let (physical_outgoing_tx, mut physical_outgoing_rx) =
|
||||
mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
|
||||
|
||||
@@ -408,6 +408,11 @@ impl RemoteEnvironmentConfig {
|
||||
/// reconnects. The registration and rendezvous URL are also reused until
|
||||
/// rendezvous rejects the URL, at which point the next attempt registers again.
|
||||
/// The websocket carries cleartext routing metadata and encrypted payloads.
|
||||
#[tracing::instrument(
|
||||
name = "codex.exec_server",
|
||||
skip_all,
|
||||
fields(otel.kind = "internal")
|
||||
)]
|
||||
pub async fn run_remote_environment(
|
||||
config: RemoteEnvironmentConfig,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
|
||||
@@ -13,6 +13,11 @@ pub use transport::ExecServerListenUrlParseError;
|
||||
|
||||
use crate::ExecServerRuntimePaths;
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "codex.exec_server",
|
||||
skip_all,
|
||||
fields(otel.kind = "internal")
|
||||
)]
|
||||
pub async fn run_main(
|
||||
listen_url: &str,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
|
||||
Reference in New Issue
Block a user