From 81abb44f686359719da32974fc3ceec5c1c52db6 Mon Sep 17 00:00:00 2001 From: xl-openai Date: Fri, 27 Mar 2026 14:21:18 -0700 Subject: [PATCH] plugins: Clean up stale curated plugin sync temp dirs and add sync metrics (#16035) 1. Keep curated plugin staging directories under TempDir ownership until activation succeeds, so failed git/HTTP sync attempts do not leak plugins-clone-*. 2. Best-effort clean up stale plugins-clone-* directories before creating a new staged repo, using a conservative age threshold. 3. Emit OTEL counters for curated plugin startup sync transport attempts and final outcome across git and HTTP paths. --- codex-rs/core/src/plugins/startup_sync.rs | 157 ++++++++++++++++-- .../core/src/plugins/startup_sync_tests.rs | 147 ++++++++++++++++ codex-rs/otel/src/metrics/names.rs | 2 + 3 files changed, 292 insertions(+), 14 deletions(-) diff --git a/codex-rs/core/src/plugins/startup_sync.rs b/codex-rs/core/src/plugins/startup_sync.rs index 3511c10c5..ceeb1d0f8 100644 --- a/codex-rs/core/src/plugins/startup_sync.rs +++ b/codex-rs/core/src/plugins/startup_sync.rs @@ -7,8 +7,11 @@ use std::process::Stdio; use std::sync::Arc; use std::time::Duration; +use codex_otel::metrics::names::CURATED_PLUGINS_STARTUP_SYNC_FINAL_METRIC; +use codex_otel::metrics::names::CURATED_PLUGINS_STARTUP_SYNC_METRIC; use reqwest::Client; use serde::Deserialize; +use tempfile::TempDir; use tracing::info; use tracing::warn; use zip::ZipArchive; @@ -27,6 +30,8 @@ const CURATED_PLUGINS_RELATIVE_DIR: &str = ".tmp/plugins"; const CURATED_PLUGINS_SHA_FILE: &str = ".tmp/plugins.sha"; const CURATED_PLUGINS_GIT_TIMEOUT: Duration = Duration::from_secs(30); const CURATED_PLUGINS_HTTP_TIMEOUT: Duration = Duration::from_secs(30); +// Keep this comfortably above a normal sync attempt so we do not race another Codex process. +const CURATED_PLUGINS_STALE_TEMP_DIR_MAX_AGE: Duration = Duration::from_secs(10 * 60); const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1"; const STARTUP_REMOTE_PLUGIN_SYNC_PREREQUISITE_TIMEOUT: Duration = Duration::from_secs(5); @@ -63,14 +68,23 @@ fn sync_openai_plugins_repo_with_transport_overrides( api_base_url: &str, ) -> Result { match sync_openai_plugins_repo_via_git(codex_home, git_binary) { - Ok(remote_sha) => Ok(remote_sha), + Ok(remote_sha) => { + emit_curated_plugins_startup_sync_metric("git", "success"); + emit_curated_plugins_startup_sync_final_metric("git", "success"); + Ok(remote_sha) + } Err(err) => { + emit_curated_plugins_startup_sync_metric("git", "failure"); warn!( error = %err, git_binary, "git sync failed for curated plugin sync; falling back to GitHub HTTP" ); - sync_openai_plugins_repo_via_http(codex_home, api_base_url) + let result = sync_openai_plugins_repo_via_http(codex_home, api_base_url); + let status = if result.is_ok() { "success" } else { "failure" }; + emit_curated_plugins_startup_sync_metric("http", status); + emit_curated_plugins_startup_sync_final_metric("http", status); + result } } } @@ -85,7 +99,7 @@ fn sync_openai_plugins_repo_via_git(codex_home: &Path, git_binary: &str) -> Resu return Ok(remote_sha); } - let cloned_repo_path = prepare_curated_repo_parent_and_temp_dir(&repo_path)?; + let staged_repo_dir = prepare_curated_repo_parent_and_temp_dir(&repo_path)?; let clone_output = run_git_command_with_timeout( Command::new(git_binary) .env("GIT_OPTIONAL_LOCKS", "0") @@ -93,21 +107,21 @@ fn sync_openai_plugins_repo_via_git(codex_home: &Path, git_binary: &str) -> Resu .arg("--depth") .arg("1") .arg("https://github.com/openai/plugins.git") - .arg(&cloned_repo_path), + .arg(staged_repo_dir.path()), "git clone curated plugins repo", CURATED_PLUGINS_GIT_TIMEOUT, )?; ensure_git_success(&clone_output, "git clone curated plugins repo")?; - let cloned_sha = git_head_sha(&cloned_repo_path, git_binary)?; + let cloned_sha = git_head_sha(staged_repo_dir.path(), git_binary)?; if cloned_sha != remote_sha { return Err(format!( "curated plugins clone HEAD mismatch: expected {remote_sha}, got {cloned_sha}" )); } - ensure_marketplace_manifest_exists(&cloned_repo_path)?; - activate_curated_repo(&repo_path, &cloned_repo_path)?; + ensure_marketplace_manifest_exists(staged_repo_dir.path())?; + activate_curated_repo(&repo_path, staged_repo_dir)?; write_curated_plugins_sha(&sha_path, &remote_sha)?; Ok(remote_sha) } @@ -129,11 +143,11 @@ fn sync_openai_plugins_repo_via_http( return Ok(remote_sha); } - let cloned_repo_path = prepare_curated_repo_parent_and_temp_dir(&repo_path)?; + let staged_repo_dir = prepare_curated_repo_parent_and_temp_dir(&repo_path)?; let zipball_bytes = runtime.block_on(fetch_curated_repo_zipball(api_base_url, &remote_sha))?; - extract_zipball_to_dir(&zipball_bytes, &cloned_repo_path)?; - ensure_marketplace_manifest_exists(&cloned_repo_path)?; - activate_curated_repo(&repo_path, &cloned_repo_path)?; + extract_zipball_to_dir(&zipball_bytes, staged_repo_dir.path())?; + ensure_marketplace_manifest_exists(staged_repo_dir.path())?; + activate_curated_repo(&repo_path, staged_repo_dir)?; write_curated_plugins_sha(&sha_path, &remote_sha)?; Ok(remote_sha) } @@ -227,7 +241,7 @@ async fn write_startup_remote_plugin_sync_marker(codex_home: &Path) -> std::io:: tokio::fs::write(marker_path, b"ok\n").await } -fn prepare_curated_repo_parent_and_temp_dir(repo_path: &Path) -> Result { +fn prepare_curated_repo_parent_and_temp_dir(repo_path: &Path) -> Result { let Some(parent) = repo_path.parent() else { return Err(format!( "failed to determine curated plugins parent directory for {}", @@ -240,6 +254,7 @@ fn prepare_curated_repo_parent_and_temp_dir(repo_path: &Path) -> Result Result entries, + Err(err) => { + warn!( + error = %err, + parent = %parent.display(), + "failed to list curated plugins temp directory parent for stale cleanup" + ); + return; + } + }; + + for entry in entries.flatten() { + let file_type = match entry.file_type() { + Ok(file_type) => file_type, + Err(err) => { + warn!( + error = %err, + path = %entry.path().display(), + "failed to inspect curated plugins temp directory entry" + ); + continue; + } + }; + if !file_type.is_dir() { + continue; + } + + let path = entry.path(); + let is_plugins_clone_dir = path + .file_name() + .and_then(|name| name.to_str()) + .is_some_and(|name| name.starts_with("plugins-clone-")); + if !is_plugins_clone_dir { + continue; + } + + let metadata = match entry.metadata() { + Ok(metadata) => metadata, + Err(err) => { + warn!( + error = %err, + path = %path.display(), + "failed to read curated plugins temp directory metadata" + ); + continue; + } + }; + let modified = match metadata.modified() { + Ok(modified) => modified, + Err(err) => { + warn!( + error = %err, + path = %path.display(), + "failed to read curated plugins temp directory modification time" + ); + continue; + } + }; + let age = match modified.elapsed() { + Ok(age) => age, + Err(err) => { + warn!( + error = %err, + path = %path.display(), + "failed to compute curated plugins temp directory age" + ); + continue; + } + }; + if age < max_age { + continue; + } + + if let Err(err) = std::fs::remove_dir_all(&path) { + warn!( + error = %err, + path = %path.display(), + "failed to remove stale curated plugins temp directory" + ); + } + } +} + +fn emit_curated_plugins_startup_sync_metric(transport: &'static str, status: &'static str) { + emit_curated_plugins_startup_sync_counter( + CURATED_PLUGINS_STARTUP_SYNC_METRIC, + transport, + status, + ); +} + +fn emit_curated_plugins_startup_sync_final_metric(transport: &'static str, status: &'static str) { + emit_curated_plugins_startup_sync_counter( + CURATED_PLUGINS_STARTUP_SYNC_FINAL_METRIC, + transport, + status, + ); +} + +fn emit_curated_plugins_startup_sync_counter( + metric_name: &str, + transport: &'static str, + status: &'static str, +) { + let Some(metrics) = codex_otel::metrics::global() else { + return; + }; + let tags = [("transport", transport), ("status", status)]; + let _ = metrics.counter(metric_name, /*inc*/ 1, &tags); } fn ensure_marketplace_manifest_exists(repo_path: &Path) -> Result<(), String> { @@ -263,7 +391,8 @@ fn ensure_marketplace_manifest_exists(repo_path: &Path) -> Result<(), String> { )) } -fn activate_curated_repo(repo_path: &Path, staged_repo_path: &Path) -> Result<(), String> { +fn activate_curated_repo(repo_path: &Path, staged_repo_dir: TempDir) -> Result<(), String> { + let staged_repo_path = staged_repo_dir.path(); if repo_path.exists() { let parent = repo_path.parent().ok_or_else(|| { format!( diff --git a/codex-rs/core/src/plugins/startup_sync_tests.rs b/codex-rs/core/src/plugins/startup_sync_tests.rs index 66c02c38f..c3d11572c 100644 --- a/codex-rs/core/src/plugins/startup_sync_tests.rs +++ b/codex-rs/core/src/plugins/startup_sync_tests.rs @@ -7,6 +7,7 @@ use crate::plugins::test_support::write_file; use crate::plugins::test_support::write_openai_curated_marketplace; use pretty_assertions::assert_eq; use std::io::Write; +use std::path::Path; use tempfile::tempdir; use wiremock::Mock; use wiremock::MockServer; @@ -17,6 +18,21 @@ use wiremock::matchers::path; use zip::ZipWriter; use zip::write::SimpleFileOptions; +fn has_plugins_clone_dirs(codex_home: &Path) -> bool { + let Ok(entries) = std::fs::read_dir(codex_home.join(".tmp")) else { + return false; + }; + + entries.flatten().any(|entry| { + let path = entry.path(); + path.is_dir() + && path + .file_name() + .and_then(|name| name.to_str()) + .is_some_and(|name| name.starts_with("plugins-clone-")) + }) +} + #[test] fn curated_plugins_repo_path_uses_codex_home_tmp_dir() { let tmp = tempdir().expect("tempdir"); @@ -38,6 +54,49 @@ fn read_curated_plugins_sha_reads_trimmed_sha_file() { ); } +#[cfg(unix)] +#[test] +fn remove_stale_curated_repo_temp_dirs_removes_only_matching_directories() { + use std::os::unix::ffi::OsStrExt; + use std::time::SystemTime; + + fn set_dir_mtime(path: &Path, age: Duration) -> Result<(), Box> { + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; + let modified_at = now.saturating_sub(age); + let tv_sec = i64::try_from(modified_at.as_secs())?; + let ts = libc::timespec { tv_sec, tv_nsec: 0 }; + let times = [ts, ts]; + let c_path = std::ffi::CString::new(path.as_os_str().as_bytes())?; + let result = unsafe { libc::utimensat(libc::AT_FDCWD, c_path.as_ptr(), times.as_ptr(), 0) }; + if result != 0 { + return Err(std::io::Error::last_os_error().into()); + } + Ok(()) + } + + let tmp = tempdir().expect("tempdir"); + let parent = tmp.path().join(".tmp"); + let stale_clone_dir = parent.join("plugins-clone-stale"); + let fresh_clone_dir = parent.join("plugins-clone-fresh"); + let unrelated_dir = parent.join("plugins-cache"); + + std::fs::create_dir_all(&stale_clone_dir).expect("create stale clone dir"); + std::fs::create_dir_all(&fresh_clone_dir).expect("create fresh clone dir"); + std::fs::create_dir_all(&unrelated_dir).expect("create unrelated dir"); + set_dir_mtime( + &stale_clone_dir, + CURATED_PLUGINS_STALE_TEMP_DIR_MAX_AGE + Duration::from_secs(60), + ) + .expect("age stale clone dir"); + set_dir_mtime(&fresh_clone_dir, Duration::ZERO).expect("age fresh clone dir"); + + remove_stale_curated_repo_temp_dirs(&parent, CURATED_PLUGINS_STALE_TEMP_DIR_MAX_AGE); + + assert!(!stale_clone_dir.exists()); + assert!(fresh_clone_dir.is_dir()); + assert!(unrelated_dir.is_dir()); +} + #[cfg(unix)] #[test] fn sync_openai_plugins_repo_prefers_git_when_available() { @@ -229,6 +288,94 @@ exit 1 assert_eq!(read_curated_plugins_sha(tmp.path()).as_deref(), Some(sha)); } +#[cfg(unix)] +#[test] +fn sync_openai_plugins_repo_via_git_cleans_up_staged_dir_on_clone_failure() { + use std::os::unix::fs::PermissionsExt; + + let tmp = tempdir().expect("tempdir"); + let bin_dir = tempfile::Builder::new() + .prefix("fake-git-partial-fail-") + .tempdir() + .expect("tempdir"); + let git_path = bin_dir.path().join("git"); + let sha = "0123456789abcdef0123456789abcdef01234567"; + + std::fs::write( + &git_path, + format!( + r#"#!/bin/sh +if [ "$1" = "ls-remote" ]; then + printf '%s\tHEAD\n' "{sha}" + exit 0 +fi +if [ "$1" = "clone" ]; then + dest="$5" + mkdir -p "$dest/.git" + echo "fatal: early EOF" >&2 + exit 128 +fi +echo "unexpected git invocation: $@" >&2 +exit 1 +"# + ), + ) + .expect("write fake git"); + let mut permissions = std::fs::metadata(&git_path) + .expect("metadata") + .permissions(); + permissions.set_mode(0o755); + std::fs::set_permissions(&git_path, permissions).expect("chmod"); + + let err = sync_openai_plugins_repo_via_git(tmp.path(), git_path.to_str().expect("utf8 path")) + .expect_err("git sync should fail"); + + assert!(err.contains("fatal: early EOF")); + assert!(!has_plugins_clone_dirs(tmp.path())); +} + +#[tokio::test] +async fn sync_openai_plugins_repo_via_http_cleans_up_staged_dir_on_extract_failure() { + let tmp = tempdir().expect("tempdir"); + let server = MockServer::start().await; + let sha = "0123456789abcdef0123456789abcdef01234567"; + + Mock::given(method("GET")) + .and(path("/repos/openai/plugins")) + .respond_with(ResponseTemplate::new(200).set_body_string(r#"{"default_branch":"main"}"#)) + .mount(&server) + .await; + Mock::given(method("GET")) + .and(path("/repos/openai/plugins/git/ref/heads/main")) + .respond_with( + ResponseTemplate::new(200) + .set_body_string(format!(r#"{{"object":{{"sha":"{sha}"}}}}"#)), + ) + .mount(&server) + .await; + Mock::given(method("GET")) + .and(path(format!("/repos/openai/plugins/zipball/{sha}"))) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-type", "application/zip") + .set_body_bytes(b"not a zip archive".to_vec()), + ) + .mount(&server) + .await; + + let server_uri = server.uri(); + let tmp_path = tmp.path().to_path_buf(); + let err = tokio::task::spawn_blocking(move || { + sync_openai_plugins_repo_via_http(tmp_path.as_path(), &server_uri) + }) + .await + .expect("sync task should join") + .expect_err("http sync should fail"); + + assert!(err.contains("failed to open curated plugins zip archive")); + assert!(!has_plugins_clone_dirs(tmp.path())); +} + #[tokio::test] async fn sync_openai_plugins_repo_skips_archive_download_when_sha_matches() { let tmp = tempdir().expect("tempdir"); diff --git a/codex-rs/otel/src/metrics/names.rs b/codex-rs/otel/src/metrics/names.rs index d4f5760d5..8dbf60b12 100644 --- a/codex-rs/otel/src/metrics/names.rs +++ b/codex-rs/otel/src/metrics/names.rs @@ -27,6 +27,8 @@ pub const TURN_NETWORK_PROXY_METRIC: &str = "codex.turn.network_proxy"; pub const TURN_TOOL_CALL_METRIC: &str = "codex.turn.tool.call"; pub const TURN_TOKEN_USAGE_METRIC: &str = "codex.turn.token_usage"; pub const PROFILE_USAGE_METRIC: &str = "codex.profile.usage"; +pub const CURATED_PLUGINS_STARTUP_SYNC_METRIC: &str = "codex.plugins.startup_sync"; +pub const CURATED_PLUGINS_STARTUP_SYNC_FINAL_METRIC: &str = "codex.plugins.startup_sync.final"; /// Total runtime of a startup prewarm attempt until it completes, tagged by final status. pub const STARTUP_PREWARM_DURATION_METRIC: &str = "codex.startup_prewarm.duration_ms"; /// Age of the startup prewarm attempt when the first real turn resolves it, tagged by outcome.