plugins: Clean up stale curated plugin sync temp dirs and add sync metrics (#16035)

1. Keep curated plugin staging directories under TempDir ownership until
activation succeeds, so failed git/HTTP sync attempts do not leak
plugins-clone-*.
2. Best-effort clean up stale plugins-clone-* directories before
creating a new staged repo, using a conservative age threshold.
3. Emit OTEL counters for curated plugin startup sync transport attempts
and final outcome across git and HTTP paths.
This commit is contained in:
xl-openai
2026-03-27 14:21:18 -07:00
committed by GitHub
Unverified
parent 8002594ee3
commit 81abb44f68
3 changed files with 292 additions and 14 deletions
+143 -14
View File
@@ -7,8 +7,11 @@ use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use codex_otel::metrics::names::CURATED_PLUGINS_STARTUP_SYNC_FINAL_METRIC;
use codex_otel::metrics::names::CURATED_PLUGINS_STARTUP_SYNC_METRIC;
use reqwest::Client;
use serde::Deserialize;
use tempfile::TempDir;
use tracing::info;
use tracing::warn;
use zip::ZipArchive;
@@ -27,6 +30,8 @@ const CURATED_PLUGINS_RELATIVE_DIR: &str = ".tmp/plugins";
const CURATED_PLUGINS_SHA_FILE: &str = ".tmp/plugins.sha";
const CURATED_PLUGINS_GIT_TIMEOUT: Duration = Duration::from_secs(30);
const CURATED_PLUGINS_HTTP_TIMEOUT: Duration = Duration::from_secs(30);
// Keep this comfortably above a normal sync attempt so we do not race another Codex process.
const CURATED_PLUGINS_STALE_TEMP_DIR_MAX_AGE: Duration = Duration::from_secs(10 * 60);
const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1";
const STARTUP_REMOTE_PLUGIN_SYNC_PREREQUISITE_TIMEOUT: Duration = Duration::from_secs(5);
@@ -63,14 +68,23 @@ fn sync_openai_plugins_repo_with_transport_overrides(
api_base_url: &str,
) -> Result<String, String> {
match sync_openai_plugins_repo_via_git(codex_home, git_binary) {
Ok(remote_sha) => Ok(remote_sha),
Ok(remote_sha) => {
emit_curated_plugins_startup_sync_metric("git", "success");
emit_curated_plugins_startup_sync_final_metric("git", "success");
Ok(remote_sha)
}
Err(err) => {
emit_curated_plugins_startup_sync_metric("git", "failure");
warn!(
error = %err,
git_binary,
"git sync failed for curated plugin sync; falling back to GitHub HTTP"
);
sync_openai_plugins_repo_via_http(codex_home, api_base_url)
let result = sync_openai_plugins_repo_via_http(codex_home, api_base_url);
let status = if result.is_ok() { "success" } else { "failure" };
emit_curated_plugins_startup_sync_metric("http", status);
emit_curated_plugins_startup_sync_final_metric("http", status);
result
}
}
}
@@ -85,7 +99,7 @@ fn sync_openai_plugins_repo_via_git(codex_home: &Path, git_binary: &str) -> Resu
return Ok(remote_sha);
}
let cloned_repo_path = prepare_curated_repo_parent_and_temp_dir(&repo_path)?;
let staged_repo_dir = prepare_curated_repo_parent_and_temp_dir(&repo_path)?;
let clone_output = run_git_command_with_timeout(
Command::new(git_binary)
.env("GIT_OPTIONAL_LOCKS", "0")
@@ -93,21 +107,21 @@ fn sync_openai_plugins_repo_via_git(codex_home: &Path, git_binary: &str) -> Resu
.arg("--depth")
.arg("1")
.arg("https://github.com/openai/plugins.git")
.arg(&cloned_repo_path),
.arg(staged_repo_dir.path()),
"git clone curated plugins repo",
CURATED_PLUGINS_GIT_TIMEOUT,
)?;
ensure_git_success(&clone_output, "git clone curated plugins repo")?;
let cloned_sha = git_head_sha(&cloned_repo_path, git_binary)?;
let cloned_sha = git_head_sha(staged_repo_dir.path(), git_binary)?;
if cloned_sha != remote_sha {
return Err(format!(
"curated plugins clone HEAD mismatch: expected {remote_sha}, got {cloned_sha}"
));
}
ensure_marketplace_manifest_exists(&cloned_repo_path)?;
activate_curated_repo(&repo_path, &cloned_repo_path)?;
ensure_marketplace_manifest_exists(staged_repo_dir.path())?;
activate_curated_repo(&repo_path, staged_repo_dir)?;
write_curated_plugins_sha(&sha_path, &remote_sha)?;
Ok(remote_sha)
}
@@ -129,11 +143,11 @@ fn sync_openai_plugins_repo_via_http(
return Ok(remote_sha);
}
let cloned_repo_path = prepare_curated_repo_parent_and_temp_dir(&repo_path)?;
let staged_repo_dir = prepare_curated_repo_parent_and_temp_dir(&repo_path)?;
let zipball_bytes = runtime.block_on(fetch_curated_repo_zipball(api_base_url, &remote_sha))?;
extract_zipball_to_dir(&zipball_bytes, &cloned_repo_path)?;
ensure_marketplace_manifest_exists(&cloned_repo_path)?;
activate_curated_repo(&repo_path, &cloned_repo_path)?;
extract_zipball_to_dir(&zipball_bytes, staged_repo_dir.path())?;
ensure_marketplace_manifest_exists(staged_repo_dir.path())?;
activate_curated_repo(&repo_path, staged_repo_dir)?;
write_curated_plugins_sha(&sha_path, &remote_sha)?;
Ok(remote_sha)
}
@@ -227,7 +241,7 @@ async fn write_startup_remote_plugin_sync_marker(codex_home: &Path) -> std::io::
tokio::fs::write(marker_path, b"ok\n").await
}
fn prepare_curated_repo_parent_and_temp_dir(repo_path: &Path) -> Result<PathBuf, String> {
fn prepare_curated_repo_parent_and_temp_dir(repo_path: &Path) -> Result<TempDir, String> {
let Some(parent) = repo_path.parent() else {
return Err(format!(
"failed to determine curated plugins parent directory for {}",
@@ -240,6 +254,7 @@ fn prepare_curated_repo_parent_and_temp_dir(repo_path: &Path) -> Result<PathBuf,
parent.display()
)
})?;
remove_stale_curated_repo_temp_dirs(parent, CURATED_PLUGINS_STALE_TEMP_DIR_MAX_AGE);
let clone_dir = tempfile::Builder::new()
.prefix("plugins-clone-")
@@ -250,7 +265,120 @@ fn prepare_curated_repo_parent_and_temp_dir(repo_path: &Path) -> Result<PathBuf,
parent.display()
)
})?;
Ok(clone_dir.keep())
Ok(clone_dir)
}
fn remove_stale_curated_repo_temp_dirs(parent: &Path, max_age: Duration) {
let entries = match std::fs::read_dir(parent) {
Ok(entries) => entries,
Err(err) => {
warn!(
error = %err,
parent = %parent.display(),
"failed to list curated plugins temp directory parent for stale cleanup"
);
return;
}
};
for entry in entries.flatten() {
let file_type = match entry.file_type() {
Ok(file_type) => file_type,
Err(err) => {
warn!(
error = %err,
path = %entry.path().display(),
"failed to inspect curated plugins temp directory entry"
);
continue;
}
};
if !file_type.is_dir() {
continue;
}
let path = entry.path();
let is_plugins_clone_dir = path
.file_name()
.and_then(|name| name.to_str())
.is_some_and(|name| name.starts_with("plugins-clone-"));
if !is_plugins_clone_dir {
continue;
}
let metadata = match entry.metadata() {
Ok(metadata) => metadata,
Err(err) => {
warn!(
error = %err,
path = %path.display(),
"failed to read curated plugins temp directory metadata"
);
continue;
}
};
let modified = match metadata.modified() {
Ok(modified) => modified,
Err(err) => {
warn!(
error = %err,
path = %path.display(),
"failed to read curated plugins temp directory modification time"
);
continue;
}
};
let age = match modified.elapsed() {
Ok(age) => age,
Err(err) => {
warn!(
error = %err,
path = %path.display(),
"failed to compute curated plugins temp directory age"
);
continue;
}
};
if age < max_age {
continue;
}
if let Err(err) = std::fs::remove_dir_all(&path) {
warn!(
error = %err,
path = %path.display(),
"failed to remove stale curated plugins temp directory"
);
}
}
}
fn emit_curated_plugins_startup_sync_metric(transport: &'static str, status: &'static str) {
emit_curated_plugins_startup_sync_counter(
CURATED_PLUGINS_STARTUP_SYNC_METRIC,
transport,
status,
);
}
fn emit_curated_plugins_startup_sync_final_metric(transport: &'static str, status: &'static str) {
emit_curated_plugins_startup_sync_counter(
CURATED_PLUGINS_STARTUP_SYNC_FINAL_METRIC,
transport,
status,
);
}
fn emit_curated_plugins_startup_sync_counter(
metric_name: &str,
transport: &'static str,
status: &'static str,
) {
let Some(metrics) = codex_otel::metrics::global() else {
return;
};
let tags = [("transport", transport), ("status", status)];
let _ = metrics.counter(metric_name, /*inc*/ 1, &tags);
}
fn ensure_marketplace_manifest_exists(repo_path: &Path) -> Result<(), String> {
@@ -263,7 +391,8 @@ fn ensure_marketplace_manifest_exists(repo_path: &Path) -> Result<(), String> {
))
}
fn activate_curated_repo(repo_path: &Path, staged_repo_path: &Path) -> Result<(), String> {
fn activate_curated_repo(repo_path: &Path, staged_repo_dir: TempDir) -> Result<(), String> {
let staged_repo_path = staged_repo_dir.path();
if repo_path.exists() {
let parent = repo_path.parent().ok_or_else(|| {
format!(
@@ -7,6 +7,7 @@ use crate::plugins::test_support::write_file;
use crate::plugins::test_support::write_openai_curated_marketplace;
use pretty_assertions::assert_eq;
use std::io::Write;
use std::path::Path;
use tempfile::tempdir;
use wiremock::Mock;
use wiremock::MockServer;
@@ -17,6 +18,21 @@ use wiremock::matchers::path;
use zip::ZipWriter;
use zip::write::SimpleFileOptions;
fn has_plugins_clone_dirs(codex_home: &Path) -> bool {
let Ok(entries) = std::fs::read_dir(codex_home.join(".tmp")) else {
return false;
};
entries.flatten().any(|entry| {
let path = entry.path();
path.is_dir()
&& path
.file_name()
.and_then(|name| name.to_str())
.is_some_and(|name| name.starts_with("plugins-clone-"))
})
}
#[test]
fn curated_plugins_repo_path_uses_codex_home_tmp_dir() {
let tmp = tempdir().expect("tempdir");
@@ -38,6 +54,49 @@ fn read_curated_plugins_sha_reads_trimmed_sha_file() {
);
}
#[cfg(unix)]
#[test]
fn remove_stale_curated_repo_temp_dirs_removes_only_matching_directories() {
use std::os::unix::ffi::OsStrExt;
use std::time::SystemTime;
fn set_dir_mtime(path: &Path, age: Duration) -> Result<(), Box<dyn std::error::Error>> {
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
let modified_at = now.saturating_sub(age);
let tv_sec = i64::try_from(modified_at.as_secs())?;
let ts = libc::timespec { tv_sec, tv_nsec: 0 };
let times = [ts, ts];
let c_path = std::ffi::CString::new(path.as_os_str().as_bytes())?;
let result = unsafe { libc::utimensat(libc::AT_FDCWD, c_path.as_ptr(), times.as_ptr(), 0) };
if result != 0 {
return Err(std::io::Error::last_os_error().into());
}
Ok(())
}
let tmp = tempdir().expect("tempdir");
let parent = tmp.path().join(".tmp");
let stale_clone_dir = parent.join("plugins-clone-stale");
let fresh_clone_dir = parent.join("plugins-clone-fresh");
let unrelated_dir = parent.join("plugins-cache");
std::fs::create_dir_all(&stale_clone_dir).expect("create stale clone dir");
std::fs::create_dir_all(&fresh_clone_dir).expect("create fresh clone dir");
std::fs::create_dir_all(&unrelated_dir).expect("create unrelated dir");
set_dir_mtime(
&stale_clone_dir,
CURATED_PLUGINS_STALE_TEMP_DIR_MAX_AGE + Duration::from_secs(60),
)
.expect("age stale clone dir");
set_dir_mtime(&fresh_clone_dir, Duration::ZERO).expect("age fresh clone dir");
remove_stale_curated_repo_temp_dirs(&parent, CURATED_PLUGINS_STALE_TEMP_DIR_MAX_AGE);
assert!(!stale_clone_dir.exists());
assert!(fresh_clone_dir.is_dir());
assert!(unrelated_dir.is_dir());
}
#[cfg(unix)]
#[test]
fn sync_openai_plugins_repo_prefers_git_when_available() {
@@ -229,6 +288,94 @@ exit 1
assert_eq!(read_curated_plugins_sha(tmp.path()).as_deref(), Some(sha));
}
#[cfg(unix)]
#[test]
fn sync_openai_plugins_repo_via_git_cleans_up_staged_dir_on_clone_failure() {
use std::os::unix::fs::PermissionsExt;
let tmp = tempdir().expect("tempdir");
let bin_dir = tempfile::Builder::new()
.prefix("fake-git-partial-fail-")
.tempdir()
.expect("tempdir");
let git_path = bin_dir.path().join("git");
let sha = "0123456789abcdef0123456789abcdef01234567";
std::fs::write(
&git_path,
format!(
r#"#!/bin/sh
if [ "$1" = "ls-remote" ]; then
printf '%s\tHEAD\n' "{sha}"
exit 0
fi
if [ "$1" = "clone" ]; then
dest="$5"
mkdir -p "$dest/.git"
echo "fatal: early EOF" >&2
exit 128
fi
echo "unexpected git invocation: $@" >&2
exit 1
"#
),
)
.expect("write fake git");
let mut permissions = std::fs::metadata(&git_path)
.expect("metadata")
.permissions();
permissions.set_mode(0o755);
std::fs::set_permissions(&git_path, permissions).expect("chmod");
let err = sync_openai_plugins_repo_via_git(tmp.path(), git_path.to_str().expect("utf8 path"))
.expect_err("git sync should fail");
assert!(err.contains("fatal: early EOF"));
assert!(!has_plugins_clone_dirs(tmp.path()));
}
#[tokio::test]
async fn sync_openai_plugins_repo_via_http_cleans_up_staged_dir_on_extract_failure() {
let tmp = tempdir().expect("tempdir");
let server = MockServer::start().await;
let sha = "0123456789abcdef0123456789abcdef01234567";
Mock::given(method("GET"))
.and(path("/repos/openai/plugins"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"{"default_branch":"main"}"#))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/repos/openai/plugins/git/ref/heads/main"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(format!(r#"{{"object":{{"sha":"{sha}"}}}}"#)),
)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path(format!("/repos/openai/plugins/zipball/{sha}")))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "application/zip")
.set_body_bytes(b"not a zip archive".to_vec()),
)
.mount(&server)
.await;
let server_uri = server.uri();
let tmp_path = tmp.path().to_path_buf();
let err = tokio::task::spawn_blocking(move || {
sync_openai_plugins_repo_via_http(tmp_path.as_path(), &server_uri)
})
.await
.expect("sync task should join")
.expect_err("http sync should fail");
assert!(err.contains("failed to open curated plugins zip archive"));
assert!(!has_plugins_clone_dirs(tmp.path()));
}
#[tokio::test]
async fn sync_openai_plugins_repo_skips_archive_download_when_sha_matches() {
let tmp = tempdir().expect("tempdir");
+2
View File
@@ -27,6 +27,8 @@ pub const TURN_NETWORK_PROXY_METRIC: &str = "codex.turn.network_proxy";
pub const TURN_TOOL_CALL_METRIC: &str = "codex.turn.tool.call";
pub const TURN_TOKEN_USAGE_METRIC: &str = "codex.turn.token_usage";
pub const PROFILE_USAGE_METRIC: &str = "codex.profile.usage";
pub const CURATED_PLUGINS_STARTUP_SYNC_METRIC: &str = "codex.plugins.startup_sync";
pub const CURATED_PLUGINS_STARTUP_SYNC_FINAL_METRIC: &str = "codex.plugins.startup_sync.final";
/// Total runtime of a startup prewarm attempt until it completes, tagged by final status.
pub const STARTUP_PREWARM_DURATION_METRIC: &str = "codex.startup_prewarm.duration_ms";
/// Age of the startup prewarm attempt when the first real turn resolves it, tagged by outcome.