mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
[codex-app-server-test-client & codex-app-server] Plugin Usage Analytics Smoke Test (#27099)
## This PR The original [combined remote plugin analytics PR #26281](https://github.com/openai/codex/pull/26281) mixed reusable analytics test infrastructure, two manual smoke workflows, a metadata refactor, and the final identity behavior. This PR establishes a non-mutating end-to-end plugin smoke workflow before any analytics identity semantics change. - Add `plugin-analytics-smoke` to the existing app-server test client. - Exercise plugin disable, enable, and use through production app-server RPC paths. - Isolate config writes in a temporary file and use a loopback Responses API server. - Capture analytics without sending them to the production analytics backend. - Validate the current local `plugin_id`, names, capability metadata, thread, turn, and model fields. This is intentionally a baseline smoke workflow. It does not assert `remote_plugin_id`; the final PR will update it when that field exists. Review this PR as the net diff against #27093. ## Testing - The test-client target compiles successfully. - The combined reference branch exercised the manual smoke against the live remote plugin service. - CI is green across the required platform matrix. ## Split Overview ```text main ├── #27093 Debug analytics capture │ └── #27099 Non-mutating plugin smoke ← you are here │ └── #27100 Remote install/uninstall smoke └── #27102 Plugin telemetry metadata refactor After #27093, #27099, #27100, and #27102 merge: └── Final PR: add remote_plugin_id to plugin analytics ``` Review order and dependencies: 1. [#27093 Add debug-only analytics event capture](https://github.com/openai/codex/pull/27093) (based on `main`) 2. [#27099 Add a plugin analytics smoke workflow](https://github.com/openai/codex/pull/27099) **(this PR, stacked on #27093)** 3. [#27100 Add a remote plugin analytics mutation smoke workflow](https://github.com/openai/codex/pull/27100) (stacked on this PR) 4. [#27102 Centralize plugin telemetry metadata construction](https://github.com/openai/codex/pull/27102) (independent, based on `main`) 5. Final remote-ID behavior PR (created after PRs 1-4 merge) The original [#26281](https://github.com/openai/codex/pull/26281) remains open as the green aggregate reference until the final PR is published.
This commit is contained in:
committed by
GitHub
Unverified
parent
a544f5a612
commit
a376781a3c
@@ -18,6 +18,37 @@ cargo run -p codex-app-server-test-client -- \
|
||||
cargo run -p codex-app-server-test-client -- model-list
|
||||
```
|
||||
|
||||
## Testing Plugin Analytics
|
||||
|
||||
The `plugin-analytics-smoke` command exercises `plugin/installed`, plugin
|
||||
enable/disable config writes, and a structured plugin mention through one
|
||||
app-server connection. Analytics are captured to a local JSONL file and are
|
||||
not sent to the analytics backend. The model turn uses a loopback Responses
|
||||
API server.
|
||||
|
||||
The selected plugin must already be installed and enabled remotely, and the
|
||||
active Codex profile must be authenticated. On a fresh local cache, the command
|
||||
retries ephemeral turns while the installed remote bundle finishes syncing.
|
||||
|
||||
```bash
|
||||
# Build a debug Codex binary; analytics capture is unavailable in release builds.
|
||||
cargo build -p codex-cli --bin codex
|
||||
|
||||
cargo run -p codex-app-server-test-client -- \
|
||||
--codex-bin ./target/debug/codex \
|
||||
plugin-analytics-smoke \
|
||||
--plugin-id linear@openai-curated-remote
|
||||
```
|
||||
|
||||
Use `--capture-file /tmp/plugin-analytics.jsonl` to select the output path.
|
||||
The command validates one `codex_plugin_disabled`, `codex_plugin_enabled`, and
|
||||
`codex_plugin_used` event with the expected local plugin identity and capability
|
||||
metadata. The enabled and disabled events come from successful writes to the
|
||||
temporary config; the command does not mutate the remote enabled state. It
|
||||
prints the events and leaves the JSONL file in place for inspection. It does not
|
||||
install or uninstall plugins and does not modify the profile's persistent
|
||||
config.
|
||||
|
||||
## Watching Raw Inbound Traffic
|
||||
|
||||
Initialize a connection, then print every inbound JSON-RPC message until you stop it with
|
||||
|
||||
@@ -87,6 +87,9 @@ use tungstenite::stream::MaybeTlsStream;
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
|
||||
mod loopback_responses_server;
|
||||
mod plugin_analytics_smoke;
|
||||
|
||||
const NOTIFICATIONS_TO_OPT_OUT: &[&str] = &[
|
||||
// v2 item deltas.
|
||||
"command/exec/outputDelta",
|
||||
@@ -272,6 +275,16 @@ enum CliCommand {
|
||||
#[arg(long, default_value_t = 15)]
|
||||
hold_seconds: u64,
|
||||
},
|
||||
/// Exercise remote plugin analytics through production app-server RPC paths.
|
||||
#[command(name = "plugin-analytics-smoke")]
|
||||
PluginAnalyticsSmoke {
|
||||
/// Installed local plugin id, such as `linear@openai-curated-remote`.
|
||||
#[arg(long)]
|
||||
plugin_id: String,
|
||||
/// JSONL output path. Defaults to a PID-specific file under the system temp directory.
|
||||
#[arg(long)]
|
||||
capture_file: Option<PathBuf>,
|
||||
},
|
||||
}
|
||||
|
||||
pub async fn run() -> Result<()> {
|
||||
@@ -423,6 +436,17 @@ pub async fn run() -> Result<()> {
|
||||
hold_seconds,
|
||||
)
|
||||
}
|
||||
CliCommand::PluginAnalyticsSmoke {
|
||||
plugin_id,
|
||||
capture_file,
|
||||
} => {
|
||||
ensure_dynamic_tools_unused(&dynamic_tools, "plugin-analytics-smoke")?;
|
||||
if url.is_some() {
|
||||
bail!("plugin-analytics-smoke requires --codex-bin and does not support --url");
|
||||
}
|
||||
let codex_bin = codex_bin.context("plugin-analytics-smoke requires --codex-bin")?;
|
||||
plugin_analytics_smoke::run(&codex_bin, &config_overrides, &plugin_id, capture_file)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1440,6 +1464,14 @@ impl CodexClient {
|
||||
}
|
||||
|
||||
fn spawn_stdio(codex_bin: &Path, config_overrides: &[String]) -> Result<Self> {
|
||||
Self::spawn_stdio_with_env(codex_bin, config_overrides, &[])
|
||||
}
|
||||
|
||||
fn spawn_stdio_with_env(
|
||||
codex_bin: &Path,
|
||||
config_overrides: &[String],
|
||||
environment: &[(OsString, OsString)],
|
||||
) -> Result<Self> {
|
||||
let codex_bin_display = codex_bin.display();
|
||||
let mut cmd = Command::new(codex_bin);
|
||||
if let Some(codex_bin_parent) = codex_bin.parent() {
|
||||
@@ -1453,6 +1485,9 @@ impl CodexClient {
|
||||
for override_kv in config_overrides {
|
||||
cmd.arg("--config").arg(override_kv);
|
||||
}
|
||||
for (name, value) in environment {
|
||||
cmd.env(name, value);
|
||||
}
|
||||
let mut codex_app_server = cmd
|
||||
.arg("app-server")
|
||||
.stdin(Stdio::piped())
|
||||
|
||||
@@ -0,0 +1,145 @@
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::net::TcpListener;
|
||||
use std::net::TcpStream;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Duration;
|
||||
|
||||
pub(super) struct LoopbackResponsesServer {
|
||||
base_url: String,
|
||||
shutdown: Arc<AtomicBool>,
|
||||
thread: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl LoopbackResponsesServer {
|
||||
pub(super) fn start() -> Result<Self> {
|
||||
let listener =
|
||||
TcpListener::bind("127.0.0.1:0").context("bind loopback Responses API server")?;
|
||||
listener
|
||||
.set_nonblocking(true)
|
||||
.context("set loopback Responses API server nonblocking")?;
|
||||
let address = listener.local_addr()?;
|
||||
let shutdown = Arc::new(AtomicBool::new(false));
|
||||
let thread_shutdown = Arc::clone(&shutdown);
|
||||
let thread = thread::spawn(move || {
|
||||
while !thread_shutdown.load(Ordering::Relaxed) {
|
||||
match listener.accept() {
|
||||
Ok((stream, _)) => {
|
||||
if let Err(err) = handle_model_connection(stream) {
|
||||
eprintln!("loopback Responses API server error: {err}");
|
||||
}
|
||||
}
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
Err(err) => {
|
||||
eprintln!("loopback Responses API accept error: {err}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(Self {
|
||||
base_url: format!("http://{address}"),
|
||||
shutdown,
|
||||
thread: Some(thread),
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn base_url(&self) -> &str {
|
||||
&self.base_url
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LoopbackResponsesServer {
|
||||
fn drop(&mut self) {
|
||||
self.shutdown.store(true, Ordering::Relaxed);
|
||||
if let Some(thread) = self.thread.take() {
|
||||
let _ = thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_model_connection(mut stream: TcpStream) -> io::Result<()> {
|
||||
stream.set_nonblocking(false)?;
|
||||
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
|
||||
let request = read_http_request(&mut stream)?;
|
||||
let request_line = request
|
||||
.split(|byte| *byte == b'\n')
|
||||
.next()
|
||||
.and_then(|line| std::str::from_utf8(line).ok())
|
||||
.unwrap_or_default();
|
||||
if request_line.starts_with("POST ") && request_line.contains("/responses ") {
|
||||
let body = concat!(
|
||||
"event: response.created\n",
|
||||
"data: {\"type\":\"response.created\",\"response\":{\"id\":\"resp-plugin-analytics\"}}\n\n",
|
||||
"event: response.completed\n",
|
||||
"data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-plugin-analytics\",\"usage\":{\"input_tokens\":0,\"input_tokens_details\":null,\"output_tokens\":0,\"output_tokens_details\":null,\"total_tokens\":0}}}\n\n"
|
||||
);
|
||||
write_http_response(&mut stream, "200 OK", "text/event-stream", body)
|
||||
} else {
|
||||
write_http_response(
|
||||
&mut stream,
|
||||
"404 Not Found",
|
||||
"application/json",
|
||||
r#"{"error":"not found"}"#,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn read_http_request(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
|
||||
let mut request = Vec::new();
|
||||
let mut buffer = [0_u8; 4096];
|
||||
let header_end = loop {
|
||||
let read = stream.read(&mut buffer)?;
|
||||
if read == 0 {
|
||||
return Ok(request);
|
||||
}
|
||||
request.extend_from_slice(&buffer[..read]);
|
||||
if let Some(position) = request.windows(4).position(|window| window == b"\r\n\r\n") {
|
||||
break position + 4;
|
||||
}
|
||||
};
|
||||
let content_length = parse_content_length(&request[..header_end]);
|
||||
while request.len() < header_end + content_length {
|
||||
let read = stream.read(&mut buffer)?;
|
||||
if read == 0 {
|
||||
break;
|
||||
}
|
||||
request.extend_from_slice(&buffer[..read]);
|
||||
}
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
fn parse_content_length(headers: &[u8]) -> usize {
|
||||
String::from_utf8_lossy(headers)
|
||||
.lines()
|
||||
.find_map(|line| {
|
||||
let (name, value) = line.split_once(':')?;
|
||||
name.eq_ignore_ascii_case("content-length")
|
||||
.then(|| value.trim().parse().ok())
|
||||
.flatten()
|
||||
})
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
fn write_http_response(
|
||||
stream: &mut TcpStream,
|
||||
status: &str,
|
||||
content_type: &str,
|
||||
body: &str,
|
||||
) -> io::Result<()> {
|
||||
write!(
|
||||
stream,
|
||||
"HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
|
||||
body.len()
|
||||
)?;
|
||||
stream.flush()
|
||||
}
|
||||
@@ -0,0 +1,501 @@
|
||||
use super::CodexClient;
|
||||
use super::loopback_responses_server::LoopbackResponsesServer;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::bail;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigValueWriteParams;
|
||||
use codex_app_server_protocol::ConfigWriteResponse;
|
||||
use codex_app_server_protocol::MergeStrategy;
|
||||
use codex_app_server_protocol::PluginAvailability;
|
||||
use codex_app_server_protocol::PluginInstalledParams;
|
||||
use codex_app_server_protocol::PluginInstalledResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_app_server_protocol::WriteStatus;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::ffi::OsString;
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::process;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
const ANALYTICS_CAPTURE_ENV_VAR: &str = "CODEX_ANALYTICS_EVENTS_CAPTURE_FILE";
|
||||
const TEST_USER_CONFIG_ENV_VAR: &str = "CODEX_APP_SERVER_TEST_USER_CONFIG_FILE";
|
||||
const CAPTURE_READY_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const CAPTURE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const CAPTURE_POLL_INTERVAL: Duration = Duration::from_millis(50);
|
||||
const PLUGIN_READY_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const PLUGIN_READY_RETRY_INTERVAL: Duration = Duration::from_millis(250);
|
||||
const MOCK_MODEL_SLUG: &str = "plugin-analytics-smoke";
|
||||
const MOCK_PROVIDER_ID: &str = "plugin_analytics_smoke";
|
||||
|
||||
pub(super) fn run(
|
||||
codex_bin: &Path,
|
||||
config_overrides: &[String],
|
||||
plugin_id: &str,
|
||||
capture_file: Option<PathBuf>,
|
||||
) -> Result<()> {
|
||||
let capture_path = capture_file.unwrap_or_else(|| {
|
||||
std::env::temp_dir().join(format!("codex-plugin-analytics-{}.jsonl", process::id()))
|
||||
});
|
||||
prepare_capture_file(&capture_path)?;
|
||||
|
||||
let temporary_config = TemporaryConfigFile::create()?;
|
||||
let responses_server = LoopbackResponsesServer::start()?;
|
||||
let mut overrides = config_overrides.to_vec();
|
||||
overrides.extend(smoke_config_overrides(responses_server.base_url())?);
|
||||
|
||||
let child_environment = vec![
|
||||
(
|
||||
OsString::from(ANALYTICS_CAPTURE_ENV_VAR),
|
||||
capture_path.as_os_str().to_os_string(),
|
||||
),
|
||||
(
|
||||
OsString::from(TEST_USER_CONFIG_ENV_VAR),
|
||||
temporary_config.path().as_os_str().to_os_string(),
|
||||
),
|
||||
];
|
||||
let mut client = CodexClient::spawn_stdio_with_env(codex_bin, &overrides, &child_environment)?;
|
||||
wait_until_capture_is_ready(&capture_path)?;
|
||||
client.initialize()?;
|
||||
|
||||
let installed = plugin_installed(&mut client)?;
|
||||
let expected = expected_plugin(&installed, plugin_id)?;
|
||||
write_plugin_enabled(
|
||||
&mut client,
|
||||
temporary_config.path(),
|
||||
plugin_id,
|
||||
/*enabled*/ false,
|
||||
)?;
|
||||
write_plugin_enabled(
|
||||
&mut client,
|
||||
temporary_config.path(),
|
||||
plugin_id,
|
||||
/*enabled*/ true,
|
||||
)?;
|
||||
|
||||
wait_for_plugin_usage(&mut client, &capture_path, &expected)?;
|
||||
|
||||
let events = wait_for_plugin_events(&capture_path, plugin_id)?;
|
||||
let validated = validate_plugin_events(events, &expected)?;
|
||||
println!(
|
||||
"\n[plugin analytics smoke validated]\n{}",
|
||||
serde_json::to_string_pretty(&validated)?
|
||||
);
|
||||
println!("capture file: {}", capture_path.display());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_plugin_turn(client: &mut CodexClient, expected: &ExpectedPlugin) -> Result<String> {
|
||||
let thread = client.thread_start(ThreadStartParams {
|
||||
model: Some(MOCK_MODEL_SLUG.to_string()),
|
||||
model_provider: Some(MOCK_PROVIDER_ID.to_string()),
|
||||
base_instructions: Some(String::new()),
|
||||
developer_instructions: Some(String::new()),
|
||||
ephemeral: Some(true),
|
||||
..Default::default()
|
||||
})?;
|
||||
let turn = client.turn_start(TurnStartParams {
|
||||
thread_id: thread.thread.id.clone(),
|
||||
client_user_message_id: None,
|
||||
input: vec![UserInput::Mention {
|
||||
name: expected.plugin_name.clone(),
|
||||
path: format!("plugin://{}", expected.plugin_id),
|
||||
}],
|
||||
..Default::default()
|
||||
})?;
|
||||
client.stream_turn(&thread.thread.id, &turn.turn.id)?;
|
||||
if client.last_turn_status != Some(TurnStatus::Completed) {
|
||||
bail!(
|
||||
"plugin analytics smoke turn did not complete: status={:?}, error={:?}",
|
||||
client.last_turn_status,
|
||||
client.last_turn_error_message
|
||||
);
|
||||
}
|
||||
Ok(turn.turn.id)
|
||||
}
|
||||
|
||||
fn wait_for_plugin_usage(
|
||||
client: &mut CodexClient,
|
||||
capture_path: &Path,
|
||||
expected: &ExpectedPlugin,
|
||||
) -> Result<()> {
|
||||
let deadline = Instant::now() + PLUGIN_READY_TIMEOUT;
|
||||
let mut attempts = 0;
|
||||
loop {
|
||||
attempts += 1;
|
||||
let turn_id = run_plugin_turn(client, expected)?;
|
||||
// Turn completion is queued after plugin usage, so its captured event is the
|
||||
// barrier that tells us whether this attempt resolved the plugin.
|
||||
let events = wait_for_turn_analytics(capture_path, &turn_id)?;
|
||||
if events.iter().any(|event| {
|
||||
event["event_type"] == "codex_plugin_used"
|
||||
&& event["event_params"]["turn_id"].as_str() == Some(turn_id.as_str())
|
||||
&& event["event_params"]["plugin_id"].as_str() == Some(expected.plugin_id.as_str())
|
||||
}) {
|
||||
if attempts > 1 {
|
||||
println!("remote plugin bundle became ready after {attempts} turn attempts");
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
bail!(
|
||||
"timed out waiting for remote plugin bundle `{}` to become usable after {attempts} turn attempts",
|
||||
expected.plugin_id
|
||||
);
|
||||
}
|
||||
thread::sleep(PLUGIN_READY_RETRY_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ExpectedPlugin {
|
||||
plugin_id: String,
|
||||
plugin_name: String,
|
||||
marketplace_name: String,
|
||||
}
|
||||
|
||||
fn plugin_installed(client: &mut CodexClient) -> Result<PluginInstalledResponse> {
|
||||
let request_id = client.request_id();
|
||||
client.send_request(
|
||||
ClientRequest::PluginInstalled {
|
||||
request_id: request_id.clone(),
|
||||
params: PluginInstalledParams {
|
||||
cwds: None,
|
||||
install_suggestion_plugin_names: None,
|
||||
},
|
||||
},
|
||||
request_id,
|
||||
"plugin/installed",
|
||||
)
|
||||
}
|
||||
|
||||
fn expected_plugin(response: &PluginInstalledResponse, plugin_id: &str) -> Result<ExpectedPlugin> {
|
||||
let matches = response
|
||||
.marketplaces
|
||||
.iter()
|
||||
.flat_map(|marketplace| {
|
||||
marketplace
|
||||
.plugins
|
||||
.iter()
|
||||
.filter(move |plugin| plugin.id == plugin_id)
|
||||
.map(move |plugin| (marketplace, plugin))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let [(marketplace, plugin)] = matches.as_slice() else {
|
||||
bail!(
|
||||
"expected exactly one installed plugin with local id `{plugin_id}`, found {}",
|
||||
matches.len()
|
||||
);
|
||||
};
|
||||
if !plugin.installed {
|
||||
bail!("plugin `{plugin_id}` is not installed");
|
||||
}
|
||||
if !plugin.enabled {
|
||||
bail!("plugin `{plugin_id}` is installed remotely but disabled");
|
||||
}
|
||||
if plugin.availability != PluginAvailability::Available {
|
||||
bail!(
|
||||
"plugin `{plugin_id}` is not available: {:?}",
|
||||
plugin.availability
|
||||
);
|
||||
}
|
||||
plugin
|
||||
.remote_plugin_id
|
||||
.as_ref()
|
||||
.with_context(|| format!("plugin `{plugin_id}` does not have a remote plugin id"))?;
|
||||
|
||||
Ok(ExpectedPlugin {
|
||||
plugin_id: plugin.id.clone(),
|
||||
plugin_name: plugin.name.clone(),
|
||||
marketplace_name: marketplace.name.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn write_plugin_enabled(
|
||||
client: &mut CodexClient,
|
||||
config_path: &Path,
|
||||
plugin_id: &str,
|
||||
enabled: bool,
|
||||
) -> Result<()> {
|
||||
let request_id = client.request_id();
|
||||
let response: ConfigWriteResponse = client.send_request(
|
||||
ClientRequest::ConfigValueWrite {
|
||||
request_id: request_id.clone(),
|
||||
params: ConfigValueWriteParams {
|
||||
key_path: format!("plugins.{plugin_id}.enabled"),
|
||||
value: json!(enabled),
|
||||
merge_strategy: MergeStrategy::Replace,
|
||||
file_path: Some(config_path.display().to_string()),
|
||||
expected_version: None,
|
||||
},
|
||||
},
|
||||
request_id,
|
||||
"config/value/write",
|
||||
)?;
|
||||
println!(
|
||||
"< config/value/write plugin={plugin_id} enabled={enabled} status={:?}",
|
||||
response.status
|
||||
);
|
||||
if response.status != WriteStatus::Ok {
|
||||
bail!(
|
||||
"config/value/write for plugin `{plugin_id}` enabled={enabled} was overridden: {:?}",
|
||||
response.overridden_metadata
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn smoke_config_overrides(responses_base_url: &str) -> Result<Vec<String>> {
|
||||
let provider_base_url = serde_json::to_string(&format!("{responses_base_url}/v1"))
|
||||
.context("serialize mock provider base URL")?;
|
||||
Ok(vec![
|
||||
"analytics.enabled=true".to_string(),
|
||||
"features.plugins=true".to_string(),
|
||||
"features.remote_plugin=true".to_string(),
|
||||
format!("model={}", quoted(MOCK_MODEL_SLUG)?),
|
||||
format!("model_provider={}", quoted(MOCK_PROVIDER_ID)?),
|
||||
format!(
|
||||
"model_providers.{MOCK_PROVIDER_ID}.name={}",
|
||||
quoted("Plugin analytics smoke mock provider")?
|
||||
),
|
||||
format!("model_providers.{MOCK_PROVIDER_ID}.base_url={provider_base_url}"),
|
||||
format!(
|
||||
"model_providers.{MOCK_PROVIDER_ID}.wire_api={}",
|
||||
quoted("responses")?
|
||||
),
|
||||
format!("model_providers.{MOCK_PROVIDER_ID}.requires_openai_auth=false"),
|
||||
format!("model_providers.{MOCK_PROVIDER_ID}.request_max_retries=0"),
|
||||
format!("model_providers.{MOCK_PROVIDER_ID}.stream_max_retries=0"),
|
||||
])
|
||||
}
|
||||
|
||||
fn quoted(value: &str) -> Result<String> {
|
||||
serde_json::to_string(value).context("serialize config string")
|
||||
}
|
||||
|
||||
fn prepare_capture_file(path: &Path) -> Result<()> {
|
||||
let parent = path
|
||||
.parent()
|
||||
.context("capture file must have a parent directory")?;
|
||||
if !parent.is_dir() {
|
||||
bail!(
|
||||
"capture file parent directory does not exist: {}",
|
||||
parent.display()
|
||||
);
|
||||
}
|
||||
match fs::remove_file(path) {
|
||||
Ok(()) => {}
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
|
||||
Err(err) => {
|
||||
return Err(err)
|
||||
.with_context(|| format!("remove previous capture file {}", path.display()));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn wait_until_capture_is_ready(path: &Path) -> Result<()> {
|
||||
let deadline = Instant::now() + CAPTURE_READY_TIMEOUT;
|
||||
loop {
|
||||
match fs::metadata(path) {
|
||||
Ok(_) => return Ok(()),
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
|
||||
Err(err) => {
|
||||
return Err(err)
|
||||
.with_context(|| format!("inspect capture file {}", path.display()));
|
||||
}
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
bail!(
|
||||
"analytics capture did not become ready at {}; use a debug Codex binary",
|
||||
path.display()
|
||||
);
|
||||
}
|
||||
thread::sleep(CAPTURE_POLL_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_for_plugin_events(path: &Path, plugin_id: &str) -> Result<Vec<Value>> {
|
||||
let deadline = Instant::now() + CAPTURE_TIMEOUT;
|
||||
loop {
|
||||
let events = read_plugin_events(path, plugin_id)?;
|
||||
if required_event_types()
|
||||
.iter()
|
||||
.all(|event_type| event_count(&events, event_type) >= 1)
|
||||
{
|
||||
return Ok(events);
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
bail!(
|
||||
"timed out waiting for plugin analytics events in {}: found {:?}",
|
||||
path.display(),
|
||||
events
|
||||
.iter()
|
||||
.filter_map(|event| event["event_type"].as_str())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
thread::sleep(CAPTURE_POLL_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_for_turn_analytics(path: &Path, turn_id: &str) -> Result<Vec<Value>> {
|
||||
let deadline = Instant::now() + CAPTURE_TIMEOUT;
|
||||
loop {
|
||||
let events = read_capture_events(path)?;
|
||||
if events.iter().any(|event| {
|
||||
event["event_type"] == "codex_turn_event"
|
||||
&& event["event_params"]["turn_id"].as_str() == Some(turn_id)
|
||||
}) {
|
||||
return Ok(events);
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
bail!(
|
||||
"timed out waiting for turn analytics for `{turn_id}` in {}",
|
||||
path.display()
|
||||
);
|
||||
}
|
||||
thread::sleep(CAPTURE_POLL_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
fn read_plugin_events(path: &Path, plugin_id: &str) -> Result<Vec<Value>> {
|
||||
Ok(read_capture_events(path)?
|
||||
.into_iter()
|
||||
.filter(|event| event["event_params"]["plugin_id"] == plugin_id)
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn read_capture_events(path: &Path) -> Result<Vec<Value>> {
|
||||
let contents = match fs::read_to_string(path) {
|
||||
Ok(contents) => contents,
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
|
||||
Err(err) => {
|
||||
return Err(err).with_context(|| format!("read capture file {}", path.display()));
|
||||
}
|
||||
};
|
||||
let mut captured = Vec::new();
|
||||
for (index, line) in contents.lines().enumerate() {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let payload: Value = serde_json::from_str(line).with_context(|| {
|
||||
format!(
|
||||
"parse analytics capture line {} from {}",
|
||||
index + 1,
|
||||
path.display()
|
||||
)
|
||||
})?;
|
||||
let events = payload["events"]
|
||||
.as_array()
|
||||
.context("analytics capture payload is missing events")?;
|
||||
captured.extend(events.iter().cloned());
|
||||
}
|
||||
Ok(captured)
|
||||
}
|
||||
|
||||
fn validate_plugin_events(events: Vec<Value>, expected: &ExpectedPlugin) -> Result<Vec<Value>> {
|
||||
let mut validated = Vec::new();
|
||||
for event_type in required_event_types() {
|
||||
let matching = events
|
||||
.iter()
|
||||
.filter(|event| event["event_type"] == event_type)
|
||||
.collect::<Vec<_>>();
|
||||
let [event] = matching.as_slice() else {
|
||||
bail!(
|
||||
"expected exactly one `{event_type}` event for `{}`, found {}",
|
||||
expected.plugin_id,
|
||||
matching.len()
|
||||
);
|
||||
};
|
||||
validate_identity(event, expected)?;
|
||||
if event_type == "codex_plugin_used" {
|
||||
validate_used_metadata(event)?;
|
||||
}
|
||||
validated.push((*event).clone());
|
||||
}
|
||||
Ok(validated)
|
||||
}
|
||||
|
||||
fn required_event_types() -> [&'static str; 3] {
|
||||
[
|
||||
"codex_plugin_disabled",
|
||||
"codex_plugin_enabled",
|
||||
"codex_plugin_used",
|
||||
]
|
||||
}
|
||||
|
||||
fn event_count(events: &[Value], event_type: &str) -> usize {
|
||||
events
|
||||
.iter()
|
||||
.filter(|event| event["event_type"] == event_type)
|
||||
.count()
|
||||
}
|
||||
|
||||
fn validate_identity(event: &Value, expected: &ExpectedPlugin) -> Result<()> {
|
||||
let params = &event["event_params"];
|
||||
require_string(params, "plugin_id", &expected.plugin_id)?;
|
||||
require_string(params, "plugin_name", &expected.plugin_name)?;
|
||||
require_string(params, "marketplace_name", &expected.marketplace_name)
|
||||
}
|
||||
|
||||
fn validate_used_metadata(event: &Value) -> Result<()> {
|
||||
let params = &event["event_params"];
|
||||
for field in [
|
||||
"has_skills",
|
||||
"mcp_server_count",
|
||||
"connector_ids",
|
||||
"mcp_server_names",
|
||||
"thread_id",
|
||||
"turn_id",
|
||||
"model_slug",
|
||||
] {
|
||||
if params.get(field).is_none_or(Value::is_null) {
|
||||
bail!("codex_plugin_used event has null or missing `{field}`");
|
||||
}
|
||||
}
|
||||
require_string(params, "model_slug", MOCK_MODEL_SLUG)
|
||||
}
|
||||
|
||||
fn require_string(params: &Value, field: &str, expected: &str) -> Result<()> {
|
||||
let actual = params.get(field).and_then(Value::as_str);
|
||||
if actual != Some(expected) {
|
||||
bail!("expected `{field}` to be `{expected}`, got {actual:?}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct TemporaryConfigFile {
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl TemporaryConfigFile {
|
||||
fn create() -> Result<Self> {
|
||||
let path = std::env::temp_dir().join(format!(
|
||||
"codex-plugin-analytics-config-{}.toml",
|
||||
process::id()
|
||||
));
|
||||
fs::write(&path, "")
|
||||
.with_context(|| format!("create temporary config file {}", path.display()))?;
|
||||
Ok(Self { path })
|
||||
}
|
||||
|
||||
fn path(&self) -> &Path {
|
||||
&self.path
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TemporaryConfigFile {
|
||||
fn drop(&mut self) {
|
||||
let _ = fs::remove_file(&self.path);
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,8 @@ use codex_config::ThreadConfigLoader;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::resolve_installation_id;
|
||||
use codex_login::AuthManager;
|
||||
#[cfg(debug_assertions)]
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
@@ -120,6 +122,8 @@ pub use crate::transport::take_remote_control_disabled_env;
|
||||
|
||||
const LOG_FORMAT_ENV_VAR: &str = "LOG_FORMAT";
|
||||
const OTEL_SERVICE_NAME: &str = "codex-app-server";
|
||||
#[cfg(debug_assertions)]
|
||||
const TEST_USER_CONFIG_FILE_ENV_VAR: &str = "CODEX_APP_SERVER_TEST_USER_CONFIG_FILE";
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum LogFormat {
|
||||
@@ -437,6 +441,10 @@ pub async fn run_main_with_transport_options(
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
runtime_options: AppServerRuntimeOptions,
|
||||
) -> IoResult<()> {
|
||||
let loader_overrides = loader_overrides_with_test_user_config_file(
|
||||
loader_overrides,
|
||||
test_user_config_file_from_env(),
|
||||
)?;
|
||||
let (transport_event_tx, mut transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
||||
@@ -1295,6 +1303,43 @@ fn emit_state_db_backup_warning(message: &str) {
|
||||
}
|
||||
}
|
||||
|
||||
fn test_user_config_file_from_env() -> Option<std::path::PathBuf> {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
std::env::var_os(TEST_USER_CONFIG_FILE_ENV_VAR)
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(std::path::PathBuf::from)
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
None
|
||||
}
|
||||
|
||||
fn loader_overrides_with_test_user_config_file(
|
||||
mut loader_overrides: LoaderOverrides,
|
||||
test_user_config_file: Option<std::path::PathBuf>,
|
||||
) -> IoResult<LoaderOverrides> {
|
||||
#[cfg(debug_assertions)]
|
||||
if let Some(path) = test_user_config_file {
|
||||
let path = AbsolutePathBuf::from_absolute_path(path).map_err(|err| {
|
||||
std::io::Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid test user config path: {err}"),
|
||||
)
|
||||
})?;
|
||||
warn!(
|
||||
path = %path.as_path().display(),
|
||||
"using debug-only app-server test user config file"
|
||||
);
|
||||
loader_overrides.user_config_path = Some(path);
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
let _ = test_user_config_file;
|
||||
|
||||
Ok(loader_overrides)
|
||||
}
|
||||
|
||||
fn analytics_rpc_transport(transport: &AppServerTransport) -> AppServerRpcTransport {
|
||||
match transport {
|
||||
AppServerTransport::Stdio => AppServerRpcTransport::Stdio,
|
||||
@@ -1307,6 +1352,12 @@ fn analytics_rpc_transport(transport: &AppServerTransport) -> AppServerRpcTransp
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::LogFormat;
|
||||
#[cfg(debug_assertions)]
|
||||
use super::loader_overrides_with_test_user_config_file;
|
||||
#[cfg(debug_assertions)]
|
||||
use codex_config::LoaderOverrides;
|
||||
#[cfg(debug_assertions)]
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
@@ -1326,4 +1377,20 @@ mod tests {
|
||||
assert_eq!(LogFormat::from_env_value(Some("text")), LogFormat::Default);
|
||||
assert_eq!(LogFormat::from_env_value(Some("jsonl")), LogFormat::Default);
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
#[test]
|
||||
fn debug_test_user_config_file_overrides_loader_path() {
|
||||
let path = std::env::temp_dir().join("codex-app-server-test-config.toml");
|
||||
let loader_overrides = loader_overrides_with_test_user_config_file(
|
||||
LoaderOverrides::default(),
|
||||
Some(path.clone()),
|
||||
)
|
||||
.expect("test config path should be valid");
|
||||
|
||||
assert_eq!(
|
||||
loader_overrides.user_config_path,
|
||||
Some(AbsolutePathBuf::from_absolute_path(path).expect("absolute test path"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user