From c3cdf3c00793252cd93dd9790679ee49d1bd40b3 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 1 Jun 2026 20:46:54 +0200 Subject: [PATCH] Throttle repeated rollout compression runs (#25659) ## Why [#25089](https://github.com/openai/codex/pull/25089) introduced the background worker that compresses cold archived rollouts, and [#25654](https://github.com/openai/codex/pull/25654) made that pass faster once it starts. But the worker still deleted `rollout-compression.lock` on successful exit, so the existing six-hour staleness window only helped with overlapping or crashed workers. Each new local thread-store initialization could immediately rescan archived rollouts even if a full pass had just finished. This change keeps the existing marker around long enough to throttle redundant reruns. The worker is still best-effort, but it no longer does repeated startup scans when nothing new is eligible for compression. ## What Changed - Replace the drop-scoped `CompressionLock` with a `CompressionRunMarker` that claims the existing `.tmp/rollout-compression.lock` path and leaves it in place after success. - Reuse the existing six-hour staleness window to block both overlapping starts and immediate reruns, while still letting a stale marker be reclaimed. - Update the worker docs and debug logging to describe the new "already running or recently ran" behavior. - Extend the rollout compression tests to assert that a successful run leaves the marker behind and that a fresh marker suppresses a new run. ## Validation - `just test -p codex-rollout` --- codex-rs/rollout/src/compression.rs | 57 ++++++++++++++--------- codex-rs/rollout/src/compression_tests.rs | 37 +++++++++++++-- 2 files changed, 68 insertions(+), 26 deletions(-) diff --git a/codex-rs/rollout/src/compression.rs b/codex-rs/rollout/src/compression.rs index d9d08b226..8211f79d6 100644 --- a/codex-rs/rollout/src/compression.rs +++ b/codex-rs/rollout/src/compression.rs @@ -24,8 +24,8 @@ static TEMP_COUNTER: AtomicU64 = AtomicU64::new(0); /// Starts a best-effort background job that compresses cold local rollout files. /// /// The worker is fire-and-forget: failures are logged, startup is not blocked, -/// and a process-wide lock under `codex_home` prevents overlapping compression -/// runs from the same local store. +/// and a run marker under `codex_home` prevents overlapping or too-frequent +/// compression runs from the same local store. pub fn spawn_rollout_compression_worker(codex_home: PathBuf) { worker::spawn(codex_home) } @@ -246,10 +246,10 @@ mod worker { const TEMP_SUFFIX: &str = ".tmp"; const COMPRESSION_LEVEL: i32 = 3; const MIN_ROLLOUT_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60); - const GLOBAL_LOCK_STALE_AFTER: Duration = Duration::from_secs(6 * 60 * 60); - const TEMP_FILE_STALE_AFTER: Duration = GLOBAL_LOCK_STALE_AFTER; + const RUN_MARKER_STALE_AFTER: Duration = Duration::from_secs(6 * 60 * 60); + const TEMP_FILE_STALE_AFTER: Duration = RUN_MARKER_STALE_AFTER; const WORKER_MAX_RUNTIME: Duration = Duration::from_secs(5 * 60 * 60); - const LOCK_FILE_NAME: &str = "rollout-compression.lock"; + const RUN_MARKER_FILE_NAME: &str = "rollout-compression.lock"; const MAX_CONCURRENT_COMPRESSION_JOBS: usize = 2; #[derive(Default)] @@ -260,17 +260,18 @@ mod worker { failed: usize, } - struct CompressionLock { + pub(super) struct CompressionRunMarker { path: PathBuf, + remove_on_drop: bool, } - impl CompressionLock { - fn try_acquire(codex_home: &Path) -> io::Result> { - let lock_dir = codex_home.join(".tmp"); - std::fs::create_dir_all(lock_dir.as_path())?; - let path = lock_dir.join(LOCK_FILE_NAME); - match create_lock_file(path.as_path()) { - Ok(()) => return Ok(Some(Self { path })), + impl CompressionRunMarker { + pub(super) fn try_claim(codex_home: &Path) -> io::Result> { + let marker_dir = codex_home.join(".tmp"); + std::fs::create_dir_all(marker_dir.as_path())?; + let path = marker_dir.join(RUN_MARKER_FILE_NAME); + match create_run_marker_file(path.as_path()) { + Ok(()) => return Ok(Some(Self::new(path))), Err(err) if err.kind() == io::ErrorKind::AlreadyExists => {} Err(err) => return Err(err), } @@ -279,7 +280,7 @@ mod worker { .and_then(|metadata| metadata.modified()) .ok() .and_then(|modified| SystemTime::now().duration_since(modified).ok()) - .is_some_and(|age| age >= GLOBAL_LOCK_STALE_AFTER); + .is_some_and(|age| age >= RUN_MARKER_STALE_AFTER); if !stale { return Ok(None); } @@ -288,17 +289,30 @@ mod worker { Err(err) if err.kind() == io::ErrorKind::NotFound => {} Err(err) => return Err(err), } - match create_lock_file(path.as_path()) { - Ok(()) => Ok(Some(Self { path })), + match create_run_marker_file(path.as_path()) { + Ok(()) => Ok(Some(Self::new(path))), Err(err) if err.kind() == io::ErrorKind::AlreadyExists => Ok(None), Err(err) => Err(err), } } + + fn new(path: PathBuf) -> Self { + Self { + path, + remove_on_drop: true, + } + } + + pub(super) fn persist(mut self) { + self.remove_on_drop = false; + } } - impl Drop for CompressionLock { + impl Drop for CompressionRunMarker { fn drop(&mut self) { - let _ = std::fs::remove_file(self.path.as_path()); + if self.remove_on_drop { + let _ = std::fs::remove_file(self.path.as_path()); + } } } @@ -321,9 +335,9 @@ mod worker { } pub(super) async fn run(codex_home: PathBuf) -> io::Result<()> { - let Some(_lock) = CompressionLock::try_acquire(codex_home.as_path())? else { + let Some(marker) = CompressionRunMarker::try_claim(codex_home.as_path())? else { debug!( - "rollout compression worker already running for {}", + "rollout compression worker recently ran or is already running for {}", codex_home.display() ); return Ok(()); @@ -340,10 +354,11 @@ mod worker { "rollout compression worker finished: scanned={}, compressed={}, skipped={}, failed={}", stats.scanned, stats.compressed, stats.skipped, stats.failed ); + marker.persist(); Ok(()) } - fn create_lock_file(path: &Path) -> io::Result<()> { + fn create_run_marker_file(path: &Path) -> io::Result<()> { let mut file = std::fs::OpenOptions::new() .write(true) .create_new(true) diff --git a/codex-rs/rollout/src/compression_tests.rs b/codex-rs/rollout/src/compression_tests.rs index cd2618a2f..8eb238053 100644 --- a/codex-rs/rollout/src/compression_tests.rs +++ b/codex-rs/rollout/src/compression_tests.rs @@ -141,6 +141,12 @@ async fn worker_compresses_old_archived_rollouts_only() -> anyhow::Result<()> { assert!(!compressed_rollout_path(&fresh_path).exists()); assert!(!stale_temp.exists()); assert!(fresh_temp.exists()); + assert!( + home.path() + .join(".tmp") + .join("rollout-compression.lock") + .exists() + ); Ok(()) } @@ -323,16 +329,16 @@ async fn worker_skips_existing_compressed_archived_rollouts() -> anyhow::Result< } #[tokio::test] -async fn worker_skips_when_fresh_lock_exists() -> anyhow::Result<()> { +async fn worker_skips_when_fresh_run_marker_exists() -> anyhow::Result<()> { let home = TempDir::new()?; let uuid = Uuid::from_u128(11); let thread_id = ThreadId::from_string(&uuid.to_string())?; let rollout_path = archived_rollout_path(home.path(), "2025-01-03T12-00-00", uuid); - write_rollout(&rollout_path, thread_id, "locked worker")?; + write_rollout(&rollout_path, thread_id, "throttled worker")?; set_old_mtime(&rollout_path)?; - let lock_dir = home.path().join(".tmp"); - fs::create_dir_all(lock_dir.as_path())?; - fs::write(lock_dir.join("rollout-compression.lock"), "locked")?; + let marker_dir = home.path().join(".tmp"); + fs::create_dir_all(marker_dir.as_path())?; + fs::write(marker_dir.join("rollout-compression.lock"), "recent run")?; worker::run(home.path().to_path_buf()).await?; @@ -341,6 +347,27 @@ async fn worker_skips_when_fresh_lock_exists() -> anyhow::Result<()> { Ok(()) } +#[test] +fn run_marker_is_removed_unless_persisted() -> anyhow::Result<()> { + let home = TempDir::new()?; + let marker_path = home.path().join(".tmp").join("rollout-compression.lock"); + + { + let marker = worker::CompressionRunMarker::try_claim(home.path())?; + assert!(marker.is_some()); + } + assert!(!marker_path.exists()); + + let marker = worker::CompressionRunMarker::try_claim(home.path())?; + let Some(marker) = marker else { + panic!("expected run marker claim"); + }; + marker.persist(); + assert!(marker_path.exists()); + assert!(worker::CompressionRunMarker::try_claim(home.path())?.is_none()); + Ok(()) +} + #[tokio::test] async fn find_thread_path_by_id_handles_compressed_rollout_filenames() -> anyhow::Result<()> { let home = TempDir::new()?;