From cbf62f64cbfdca160e7303536ec6ea464d371281 Mon Sep 17 00:00:00 2001 From: stefanstokic-oai Date: Thu, 4 Jun 2026 13:13:05 -0400 Subject: [PATCH] Bound external agent session detection work (#26291) ## Why External agent migration detection parsed and hashed every JSONL session file. For users with many large conversations, launching migration could consume substantial CPU and disk resources. Detection only needs the most recent sessions for the migration UI, so full-content work should be bounded. ## What - Use file modification metadata to select the 50 most recent eligible sessions before parsing JSONL content. - Skip unchanged imported sessions using metadata stored in the import ledger. - Preserve content hashing when metadata indicates a session may have changed. - Stream SHA-256 calculation through a 64 KiB buffer instead of loading an entire session into memory. - Continue detecting older sessions in subsequent batches after newer sessions are imported. ## Validation - `RUST_MIN_STACK=8388608 cargo nextest run --no-fail-fast -p codex-external-agent-sessions` - 20 tests passed. - Benchmarked release builds against 250 valid JSONL sessions totaling 501 MiB: - Median detection time decreased from 1,138.8 ms to 47.0 ms. - CPU instructions decreased by 95.8%. - Both versions returned the expected 50 sessions. The benchmark used warm filesystem caches and measured the reduction in parsing, hashing, and CPU work. --- .../external-agent-sessions/src/detect.rs | 209 +++++++++++++++--- .../external-agent-sessions/src/ledger.rs | 112 ++++++++-- 2 files changed, 271 insertions(+), 50 deletions(-) diff --git a/codex-rs/external-agent-sessions/src/detect.rs b/codex-rs/external-agent-sessions/src/detect.rs index 27613eed1..f42c90766 100644 --- a/codex-rs/external-agent-sessions/src/detect.rs +++ b/codex-rs/external-agent-sessions/src/detect.rs @@ -1,7 +1,10 @@ use crate::ExternalAgentSessionMigration; use crate::ledger::load_import_ledger; +use crate::ledger::save_import_ledger; use crate::now_unix_seconds; use crate::summarize_session; +use std::cmp::Reverse; +use std::collections::BinaryHeap; use std::fs; use std::io; use std::path::Path; @@ -10,12 +13,6 @@ use std::time::Duration; const SESSION_IMPORT_MAX_COUNT: usize = 50; const SESSION_IMPORT_MAX_AGE: Duration = Duration::from_secs(30 * 24 * 60 * 60); -#[derive(Debug)] -struct SessionCandidate { - latest_timestamp: i64, - migration: ExternalAgentSessionMigration, -} - pub fn detect_recent_sessions( external_agent_home: &Path, codex_home: &Path, @@ -26,8 +23,9 @@ pub fn detect_recent_sessions( } let now = now_unix_seconds(); - let ledger = load_import_ledger(codex_home)?; - let mut candidates = Vec::new(); + let mut ledger = load_import_ledger(codex_home)?; + let source_states = ledger.source_states(); + let mut file_candidates = BinaryHeap::with_capacity(SESSION_IMPORT_MAX_COUNT + 1); for project_entry in fs::read_dir(projects_root)? { let Ok(project_entry) = project_entry else { continue; @@ -47,44 +45,67 @@ pub fn detect_recent_sessions( if path.extension().and_then(|value| value.to_str()) != Some("jsonl") { continue; } - let Ok(Some(summary)) = summarize_session(&path) else { + let Ok(metadata) = entry.metadata() else { continue; }; - let Ok(has_been_imported) = ledger.contains_current_source(&path) else { + let Ok(modified_at) = metadata.modified() else { continue; }; - if has_been_imported { + let Ok(modified_at) = modified_at.duration_since(std::time::UNIX_EPOCH) else { + continue; + }; + if (modified_at.as_secs() as i64) + < now.saturating_sub(SESSION_IMPORT_MAX_AGE.as_secs() as i64) + { continue; } - if !is_recent_enough(now, summary.latest_timestamp) { + let Ok(modified_at_nanos) = i64::try_from(modified_at.as_nanos()) else { + continue; + }; + let Ok(source_path) = fs::canonicalize(&path) else { + continue; + }; + if let Some(state) = source_states.get(source_path.as_path()) + && (state.source_modified_at == Some(modified_at_nanos) + || state.source_modified_at.is_none() + && modified_at.as_secs() as i64 <= state.imported_at) + { continue; } - let migration = summary.migration; - if !migration.cwd.is_dir() { - continue; + file_candidates.push((Reverse(modified_at_nanos), path)); + if file_candidates.len() > SESSION_IMPORT_MAX_COUNT { + file_candidates.pop(); } - candidates.push(SessionCandidate { - latest_timestamp: summary.latest_timestamp, - migration, - }); } } - candidates.sort_by(|left, right| { - right - .latest_timestamp - .cmp(&left.latest_timestamp) - .then_with(|| left.migration.path.cmp(&right.migration.path)) - }); - candidates.truncate(SESSION_IMPORT_MAX_COUNT); - Ok(candidates - .into_iter() - .map(|candidate| candidate.migration) - .collect()) -} + drop(source_states); + let file_candidates = file_candidates.into_sorted_vec(); + let mut migrations = Vec::new(); + let mut ledger_changed = false; + for (modified_at, path) in file_candidates { + match ledger.refresh_current_source(&path, modified_at.0) { + Ok(false) => {} + Ok(true) => { + ledger_changed = true; + continue; + } + Err(_) => continue, + } + let Ok(Some(summary)) = summarize_session(&path) else { + continue; + }; + let migration = summary.migration; + if !migration.cwd.is_dir() { + continue; + } + migrations.push(migration); + } + if ledger_changed { + save_import_ledger(codex_home, &ledger)?; + } -fn is_recent_enough(now: i64, latest_timestamp: i64) -> bool { - latest_timestamp >= now.saturating_sub(SESSION_IMPORT_MAX_AGE.as_secs() as i64) + Ok(migrations) } #[cfg(test)] @@ -93,7 +114,10 @@ mod tests { use crate::ledger::record_imported_session; use codex_protocol::ThreadId; use serde_json::Value as JsonValue; + use std::fs::FileTimes; + use std::fs::OpenOptions; use std::path::Path; + use std::time::SystemTime; use tempfile::TempDir; #[test] @@ -207,11 +231,11 @@ mod tests { } #[test] - fn ignores_old_sessions() { + fn uses_file_modification_time_for_recency() { let root = TempDir::new().expect("tempdir"); let external_agent_home = root.path().join(".external"); let project_root = root.path().join("repo"); - write_session( + let session_path = write_session( &external_agent_home, &project_root, "session.jsonl", @@ -223,6 +247,34 @@ mod tests { )], ); + let sessions = detect_recent_sessions(&external_agent_home, root.path()).expect("detect"); + + assert_eq!( + sessions, + vec![ExternalAgentSessionMigration { + path: session_path, + cwd: project_root, + title: Some("hello".to_string()), + }] + ); + } + + #[test] + fn ignores_sessions_with_old_file_modification_time() { + let root = TempDir::new().expect("tempdir"); + let external_agent_home = root.path().join(".external"); + let project_root = root.path().join("repo"); + let session_path = write_session( + &external_agent_home, + &project_root, + "session.jsonl", + &[record("user", "hello", &project_root)], + ); + set_modified_at( + &session_path, + SystemTime::UNIX_EPOCH + Duration::from_secs(/*secs*/ 1), + ); + assert!( detect_recent_sessions(&external_agent_home, root.path()) .expect("detect") @@ -230,6 +282,84 @@ mod tests { ); } + #[test] + fn detects_sessions_in_batches() { + let root = TempDir::new().expect("tempdir"); + let external_agent_home = root.path().join(".external"); + let project_root = root.path().join("repo"); + let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + let modified_at = SystemTime::now(); + let mut expected = Vec::new(); + for index in 0..=SESSION_IMPORT_MAX_COUNT { + let file_name = format!("{index:02}-session.jsonl"); + let title = format!("session {index}"); + let path = write_session( + &external_agent_home, + &project_root, + &file_name, + &[record_at("user", &title, &project_root, ×tamp)], + ); + set_modified_at( + &path, + modified_at - Duration::from_secs(/*secs*/ index as u64), + ); + expected.push(ExternalAgentSessionMigration { + path, + cwd: project_root.clone(), + title: Some(title), + }); + } + let oldest_session = expected.pop().expect("oldest session"); + let mut all_sessions = expected.clone(); + all_sessions.push(oldest_session.clone()); + + let sessions = detect_recent_sessions(&external_agent_home, root.path()).expect("detect"); + + assert_eq!(sessions, expected); + for session in sessions { + record_imported_session(root.path(), &session.path, ThreadId::new()) + .expect("record import"); + } + + let sessions = detect_recent_sessions(&external_agent_home, root.path()).expect("detect"); + + assert_eq!(sessions, vec![oldest_session.clone()]); + for session in sessions { + record_imported_session(root.path(), &session.path, ThreadId::new()) + .expect("record import"); + } + + let changed_at = SystemTime::now() + + Duration::from_secs(/*secs*/ SESSION_IMPORT_MAX_COUNT as u64 + 1); + for (index, session) in all_sessions.iter().enumerate() { + let title = session.title.as_deref().expect("session title"); + std::fs::write( + &session.path, + jsonl(&[ + record("user", title, &project_root), + record("assistant", "updated", &project_root), + ]), + ) + .expect("update session"); + set_modified_at( + &session.path, + changed_at - Duration::from_secs(/*secs*/ index as u64), + ); + } + + let sessions = detect_recent_sessions(&external_agent_home, root.path()).expect("detect"); + + assert_eq!(sessions, expected); + for session in sessions { + record_imported_session(root.path(), &session.path, ThreadId::new()) + .expect("record import"); + } + + let sessions = detect_recent_sessions(&external_agent_home, root.path()).expect("detect"); + + assert_eq!(sessions, vec![oldest_session]); + } + #[test] fn skips_already_imported_current_session_versions() { let root = TempDir::new().expect("tempdir"); @@ -300,6 +430,15 @@ mod tests { session_path } + fn set_modified_at(path: &Path, modified_at: SystemTime) { + OpenOptions::new() + .write(true) + .open(path) + .expect("open session") + .set_times(FileTimes::new().set_modified(modified_at)) + .expect("set session modified time"); + } + fn record(role: &str, text: &str, cwd: &Path) -> JsonValue { let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true); record_at(role, text, cwd, ×tamp) diff --git a/codex-rs/external-agent-sessions/src/ledger.rs b/codex-rs/external-agent-sessions/src/ledger.rs index 84679581c..45ff97bcf 100644 --- a/codex-rs/external-agent-sessions/src/ledger.rs +++ b/codex-rs/external-agent-sessions/src/ledger.rs @@ -4,12 +4,16 @@ use serde::Deserialize; use serde::Serialize; use sha2::Digest; use sha2::Sha256; +use std::collections::HashMap; use std::fs; +use std::fs::File; use std::io; +use std::io::Read; use std::path::Path; use std::path::PathBuf; const SESSION_IMPORT_LEDGER_FILE: &str = "external_agent_session_imports.json"; +const SESSION_HASH_BUFFER_SIZE: usize = 64 * 1024; #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub(super) struct ImportedExternalAgentSessionLedger { @@ -22,6 +26,14 @@ struct ImportedExternalAgentSessionRecord { content_sha256: String, imported_thread_id: ThreadId, imported_at: i64, + #[serde(default)] + source_modified_at: Option, +} + +#[derive(Debug, Clone, Copy)] +pub(super) struct ImportedSourceState { + pub source_modified_at: Option, + pub imported_at: i64, } pub fn has_current_session_been_imported( @@ -39,30 +51,82 @@ pub fn record_imported_session( let mut ledger = load_import_ledger(codex_home)?; let source_path = canonical_source_path(source_path)?; let content_sha256 = session_content_sha256(&source_path)?; - if ledger - .records - .iter() - .any(|record| record.source_path == source_path && record.content_sha256 == content_sha256) - { - return Ok(()); + let source_modified_at = session_modified_at(&source_path)?; + if let Some(index) = ledger.records.iter().rposition(|record| { + record.source_path == source_path && record.content_sha256 == content_sha256 + }) { + let mut record = ledger.records.remove(index); + record.imported_thread_id = imported_thread_id; + record.imported_at = now_unix_seconds(); + record.source_modified_at = source_modified_at; + ledger.records.push(record); + } else { + ledger.records.push(ImportedExternalAgentSessionRecord { + source_path, + content_sha256, + imported_thread_id, + imported_at: now_unix_seconds(), + source_modified_at, + }); } - ledger.records.push(ImportedExternalAgentSessionRecord { - source_path, - content_sha256, - imported_thread_id, - imported_at: now_unix_seconds(), - }); save_import_ledger(codex_home, &ledger) } impl ImportedExternalAgentSessionLedger { + pub(super) fn source_states(&self) -> HashMap<&Path, ImportedSourceState> { + let mut states = HashMap::new(); + for record in &self.records { + states.insert( + record.source_path.as_path(), + ImportedSourceState { + source_modified_at: record.source_modified_at, + imported_at: record.imported_at, + }, + ); + } + states + } + pub(super) fn contains_current_source(&self, source_path: &Path) -> io::Result { let source_path = canonical_source_path(source_path)?; + if !self + .records + .iter() + .any(|record| record.source_path == source_path) + { + return Ok(false); + } let content_sha256 = session_content_sha256(&source_path)?; Ok(self.records.iter().any(|record| { record.source_path == source_path && record.content_sha256 == content_sha256 })) } + + pub(super) fn refresh_current_source( + &mut self, + source_path: &Path, + source_modified_at: i64, + ) -> io::Result { + let source_path = canonical_source_path(source_path)?; + if !self + .records + .iter() + .any(|record| record.source_path == source_path) + { + return Ok(false); + } + let content_sha256 = session_content_sha256(&source_path)?; + let Some(index) = self.records.iter().rposition(|record| { + record.source_path == source_path && record.content_sha256 == content_sha256 + }) else { + return Ok(false); + }; + let mut record = self.records.remove(index); + record.imported_at = now_unix_seconds(); + record.source_modified_at = Some(source_modified_at); + self.records.push(record); + Ok(true) + } } pub(super) fn load_import_ledger( @@ -84,7 +148,7 @@ pub(super) fn load_import_ledger( }) } -fn save_import_ledger( +pub(super) fn save_import_ledger( codex_home: &Path, ledger: &ImportedExternalAgentSessionLedger, ) -> io::Result<()> { @@ -103,6 +167,24 @@ fn canonical_source_path(path: &Path) -> io::Result { } fn session_content_sha256(path: &Path) -> io::Result { - let contents = fs::read(path)?; - Ok(format!("{:x}", Sha256::digest(contents))) + let mut file = File::open(path)?; + let mut hasher = Sha256::new(); + let mut buffer = [0; SESSION_HASH_BUFFER_SIZE]; + loop { + let read = file.read(&mut buffer)?; + if read == 0 { + break; + } + hasher.update(&buffer[..read]); + } + let digest = hasher.finalize(); + Ok(format!("{digest:x}")) +} + +fn session_modified_at(path: &Path) -> io::Result> { + Ok(fs::metadata(path)? + .modified()? + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|duration| i64::try_from(duration.as_nanos()).ok())) }