diff --git a/codex-rs/external-agent-sessions/src/detect.rs b/codex-rs/external-agent-sessions/src/detect.rs index 27613eed1..f42c90766 100644 --- a/codex-rs/external-agent-sessions/src/detect.rs +++ b/codex-rs/external-agent-sessions/src/detect.rs @@ -1,7 +1,10 @@ use crate::ExternalAgentSessionMigration; use crate::ledger::load_import_ledger; +use crate::ledger::save_import_ledger; use crate::now_unix_seconds; use crate::summarize_session; +use std::cmp::Reverse; +use std::collections::BinaryHeap; use std::fs; use std::io; use std::path::Path; @@ -10,12 +13,6 @@ use std::time::Duration; const SESSION_IMPORT_MAX_COUNT: usize = 50; const SESSION_IMPORT_MAX_AGE: Duration = Duration::from_secs(30 * 24 * 60 * 60); -#[derive(Debug)] -struct SessionCandidate { - latest_timestamp: i64, - migration: ExternalAgentSessionMigration, -} - pub fn detect_recent_sessions( external_agent_home: &Path, codex_home: &Path, @@ -26,8 +23,9 @@ pub fn detect_recent_sessions( } let now = now_unix_seconds(); - let ledger = load_import_ledger(codex_home)?; - let mut candidates = Vec::new(); + let mut ledger = load_import_ledger(codex_home)?; + let source_states = ledger.source_states(); + let mut file_candidates = BinaryHeap::with_capacity(SESSION_IMPORT_MAX_COUNT + 1); for project_entry in fs::read_dir(projects_root)? { let Ok(project_entry) = project_entry else { continue; @@ -47,44 +45,67 @@ pub fn detect_recent_sessions( if path.extension().and_then(|value| value.to_str()) != Some("jsonl") { continue; } - let Ok(Some(summary)) = summarize_session(&path) else { + let Ok(metadata) = entry.metadata() else { continue; }; - let Ok(has_been_imported) = ledger.contains_current_source(&path) else { + let Ok(modified_at) = metadata.modified() else { continue; }; - if has_been_imported { + let Ok(modified_at) = modified_at.duration_since(std::time::UNIX_EPOCH) else { + continue; + }; + if (modified_at.as_secs() as i64) + < now.saturating_sub(SESSION_IMPORT_MAX_AGE.as_secs() as i64) + { continue; } - if !is_recent_enough(now, summary.latest_timestamp) { + let Ok(modified_at_nanos) = i64::try_from(modified_at.as_nanos()) else { + continue; + }; + let Ok(source_path) = fs::canonicalize(&path) else { + continue; + }; + if let Some(state) = source_states.get(source_path.as_path()) + && (state.source_modified_at == Some(modified_at_nanos) + || state.source_modified_at.is_none() + && modified_at.as_secs() as i64 <= state.imported_at) + { continue; } - let migration = summary.migration; - if !migration.cwd.is_dir() { - continue; + file_candidates.push((Reverse(modified_at_nanos), path)); + if file_candidates.len() > SESSION_IMPORT_MAX_COUNT { + file_candidates.pop(); } - candidates.push(SessionCandidate { - latest_timestamp: summary.latest_timestamp, - migration, - }); } } - candidates.sort_by(|left, right| { - right - .latest_timestamp - .cmp(&left.latest_timestamp) - .then_with(|| left.migration.path.cmp(&right.migration.path)) - }); - candidates.truncate(SESSION_IMPORT_MAX_COUNT); - Ok(candidates - .into_iter() - .map(|candidate| candidate.migration) - .collect()) -} + drop(source_states); + let file_candidates = file_candidates.into_sorted_vec(); + let mut migrations = Vec::new(); + let mut ledger_changed = false; + for (modified_at, path) in file_candidates { + match ledger.refresh_current_source(&path, modified_at.0) { + Ok(false) => {} + Ok(true) => { + ledger_changed = true; + continue; + } + Err(_) => continue, + } + let Ok(Some(summary)) = summarize_session(&path) else { + continue; + }; + let migration = summary.migration; + if !migration.cwd.is_dir() { + continue; + } + migrations.push(migration); + } + if ledger_changed { + save_import_ledger(codex_home, &ledger)?; + } -fn is_recent_enough(now: i64, latest_timestamp: i64) -> bool { - latest_timestamp >= now.saturating_sub(SESSION_IMPORT_MAX_AGE.as_secs() as i64) + Ok(migrations) } #[cfg(test)] @@ -93,7 +114,10 @@ mod tests { use crate::ledger::record_imported_session; use codex_protocol::ThreadId; use serde_json::Value as JsonValue; + use std::fs::FileTimes; + use std::fs::OpenOptions; use std::path::Path; + use std::time::SystemTime; use tempfile::TempDir; #[test] @@ -207,11 +231,11 @@ mod tests { } #[test] - fn ignores_old_sessions() { + fn uses_file_modification_time_for_recency() { let root = TempDir::new().expect("tempdir"); let external_agent_home = root.path().join(".external"); let project_root = root.path().join("repo"); - write_session( + let session_path = write_session( &external_agent_home, &project_root, "session.jsonl", @@ -223,6 +247,34 @@ mod tests { )], ); + let sessions = detect_recent_sessions(&external_agent_home, root.path()).expect("detect"); + + assert_eq!( + sessions, + vec![ExternalAgentSessionMigration { + path: session_path, + cwd: project_root, + title: Some("hello".to_string()), + }] + ); + } + + #[test] + fn ignores_sessions_with_old_file_modification_time() { + let root = TempDir::new().expect("tempdir"); + let external_agent_home = root.path().join(".external"); + let project_root = root.path().join("repo"); + let session_path = write_session( + &external_agent_home, + &project_root, + "session.jsonl", + &[record("user", "hello", &project_root)], + ); + set_modified_at( + &session_path, + SystemTime::UNIX_EPOCH + Duration::from_secs(/*secs*/ 1), + ); + assert!( detect_recent_sessions(&external_agent_home, root.path()) .expect("detect") @@ -230,6 +282,84 @@ mod tests { ); } + #[test] + fn detects_sessions_in_batches() { + let root = TempDir::new().expect("tempdir"); + let external_agent_home = root.path().join(".external"); + let project_root = root.path().join("repo"); + let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + let modified_at = SystemTime::now(); + let mut expected = Vec::new(); + for index in 0..=SESSION_IMPORT_MAX_COUNT { + let file_name = format!("{index:02}-session.jsonl"); + let title = format!("session {index}"); + let path = write_session( + &external_agent_home, + &project_root, + &file_name, + &[record_at("user", &title, &project_root, ×tamp)], + ); + set_modified_at( + &path, + modified_at - Duration::from_secs(/*secs*/ index as u64), + ); + expected.push(ExternalAgentSessionMigration { + path, + cwd: project_root.clone(), + title: Some(title), + }); + } + let oldest_session = expected.pop().expect("oldest session"); + let mut all_sessions = expected.clone(); + all_sessions.push(oldest_session.clone()); + + let sessions = detect_recent_sessions(&external_agent_home, root.path()).expect("detect"); + + assert_eq!(sessions, expected); + for session in sessions { + record_imported_session(root.path(), &session.path, ThreadId::new()) + .expect("record import"); + } + + let sessions = detect_recent_sessions(&external_agent_home, root.path()).expect("detect"); + + assert_eq!(sessions, vec![oldest_session.clone()]); + for session in sessions { + record_imported_session(root.path(), &session.path, ThreadId::new()) + .expect("record import"); + } + + let changed_at = SystemTime::now() + + Duration::from_secs(/*secs*/ SESSION_IMPORT_MAX_COUNT as u64 + 1); + for (index, session) in all_sessions.iter().enumerate() { + let title = session.title.as_deref().expect("session title"); + std::fs::write( + &session.path, + jsonl(&[ + record("user", title, &project_root), + record("assistant", "updated", &project_root), + ]), + ) + .expect("update session"); + set_modified_at( + &session.path, + changed_at - Duration::from_secs(/*secs*/ index as u64), + ); + } + + let sessions = detect_recent_sessions(&external_agent_home, root.path()).expect("detect"); + + assert_eq!(sessions, expected); + for session in sessions { + record_imported_session(root.path(), &session.path, ThreadId::new()) + .expect("record import"); + } + + let sessions = detect_recent_sessions(&external_agent_home, root.path()).expect("detect"); + + assert_eq!(sessions, vec![oldest_session]); + } + #[test] fn skips_already_imported_current_session_versions() { let root = TempDir::new().expect("tempdir"); @@ -300,6 +430,15 @@ mod tests { session_path } + fn set_modified_at(path: &Path, modified_at: SystemTime) { + OpenOptions::new() + .write(true) + .open(path) + .expect("open session") + .set_times(FileTimes::new().set_modified(modified_at)) + .expect("set session modified time"); + } + fn record(role: &str, text: &str, cwd: &Path) -> JsonValue { let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true); record_at(role, text, cwd, ×tamp) diff --git a/codex-rs/external-agent-sessions/src/ledger.rs b/codex-rs/external-agent-sessions/src/ledger.rs index 84679581c..45ff97bcf 100644 --- a/codex-rs/external-agent-sessions/src/ledger.rs +++ b/codex-rs/external-agent-sessions/src/ledger.rs @@ -4,12 +4,16 @@ use serde::Deserialize; use serde::Serialize; use sha2::Digest; use sha2::Sha256; +use std::collections::HashMap; use std::fs; +use std::fs::File; use std::io; +use std::io::Read; use std::path::Path; use std::path::PathBuf; const SESSION_IMPORT_LEDGER_FILE: &str = "external_agent_session_imports.json"; +const SESSION_HASH_BUFFER_SIZE: usize = 64 * 1024; #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub(super) struct ImportedExternalAgentSessionLedger { @@ -22,6 +26,14 @@ struct ImportedExternalAgentSessionRecord { content_sha256: String, imported_thread_id: ThreadId, imported_at: i64, + #[serde(default)] + source_modified_at: Option, +} + +#[derive(Debug, Clone, Copy)] +pub(super) struct ImportedSourceState { + pub source_modified_at: Option, + pub imported_at: i64, } pub fn has_current_session_been_imported( @@ -39,30 +51,82 @@ pub fn record_imported_session( let mut ledger = load_import_ledger(codex_home)?; let source_path = canonical_source_path(source_path)?; let content_sha256 = session_content_sha256(&source_path)?; - if ledger - .records - .iter() - .any(|record| record.source_path == source_path && record.content_sha256 == content_sha256) - { - return Ok(()); + let source_modified_at = session_modified_at(&source_path)?; + if let Some(index) = ledger.records.iter().rposition(|record| { + record.source_path == source_path && record.content_sha256 == content_sha256 + }) { + let mut record = ledger.records.remove(index); + record.imported_thread_id = imported_thread_id; + record.imported_at = now_unix_seconds(); + record.source_modified_at = source_modified_at; + ledger.records.push(record); + } else { + ledger.records.push(ImportedExternalAgentSessionRecord { + source_path, + content_sha256, + imported_thread_id, + imported_at: now_unix_seconds(), + source_modified_at, + }); } - ledger.records.push(ImportedExternalAgentSessionRecord { - source_path, - content_sha256, - imported_thread_id, - imported_at: now_unix_seconds(), - }); save_import_ledger(codex_home, &ledger) } impl ImportedExternalAgentSessionLedger { + pub(super) fn source_states(&self) -> HashMap<&Path, ImportedSourceState> { + let mut states = HashMap::new(); + for record in &self.records { + states.insert( + record.source_path.as_path(), + ImportedSourceState { + source_modified_at: record.source_modified_at, + imported_at: record.imported_at, + }, + ); + } + states + } + pub(super) fn contains_current_source(&self, source_path: &Path) -> io::Result { let source_path = canonical_source_path(source_path)?; + if !self + .records + .iter() + .any(|record| record.source_path == source_path) + { + return Ok(false); + } let content_sha256 = session_content_sha256(&source_path)?; Ok(self.records.iter().any(|record| { record.source_path == source_path && record.content_sha256 == content_sha256 })) } + + pub(super) fn refresh_current_source( + &mut self, + source_path: &Path, + source_modified_at: i64, + ) -> io::Result { + let source_path = canonical_source_path(source_path)?; + if !self + .records + .iter() + .any(|record| record.source_path == source_path) + { + return Ok(false); + } + let content_sha256 = session_content_sha256(&source_path)?; + let Some(index) = self.records.iter().rposition(|record| { + record.source_path == source_path && record.content_sha256 == content_sha256 + }) else { + return Ok(false); + }; + let mut record = self.records.remove(index); + record.imported_at = now_unix_seconds(); + record.source_modified_at = Some(source_modified_at); + self.records.push(record); + Ok(true) + } } pub(super) fn load_import_ledger( @@ -84,7 +148,7 @@ pub(super) fn load_import_ledger( }) } -fn save_import_ledger( +pub(super) fn save_import_ledger( codex_home: &Path, ledger: &ImportedExternalAgentSessionLedger, ) -> io::Result<()> { @@ -103,6 +167,24 @@ fn canonical_source_path(path: &Path) -> io::Result { } fn session_content_sha256(path: &Path) -> io::Result { - let contents = fs::read(path)?; - Ok(format!("{:x}", Sha256::digest(contents))) + let mut file = File::open(path)?; + let mut hasher = Sha256::new(); + let mut buffer = [0; SESSION_HASH_BUFFER_SIZE]; + loop { + let read = file.read(&mut buffer)?; + if read == 0 { + break; + } + hasher.update(&buffer[..read]); + } + let digest = hasher.finalize(); + Ok(format!("{digest:x}")) +} + +fn session_modified_at(path: &Path) -> io::Result> { + Ok(fs::metadata(path)? + .modified()? + .duration_since(std::time::UNIX_EPOCH) + .ok() + .and_then(|duration| i64::try_from(duration.as_nanos()).ok())) }