diff --git a/codex-rs/core/src/memories/phase1.rs b/codex-rs/core/src/memories/phase1.rs index a0cf88cc8..1d5fc9e50 100644 --- a/codex-rs/core/src/memories/phase1.rs +++ b/codex-rs/core/src/memories/phase1.rs @@ -69,10 +69,9 @@ struct StageOneOutput { /// Compact summary line used for routing and indexing. #[serde(rename = "rollout_summary")] pub(crate) rollout_summary: String, - /// Optional slug accepted from stage-1 output for forward compatibility. - /// This is currently ignored by downstream storage and naming, which remain thread-id based. + /// Optional slug used to derive rollout summary artifact filenames. #[serde(default, rename = "rollout_slug")] - pub(crate) _rollout_slug: Option, + pub(crate) rollout_slug: Option, } /// Runs memory phase 1 in strict step order: @@ -122,7 +121,7 @@ pub fn output_schema() -> Value { "rollout_slug": { "type": "string" }, "raw_memory": { "type": "string" } }, - "required": ["rollout_summary", "rollout_slug", "raw_memory"], + "required": ["rollout_summary", "raw_memory"], "additionalProperties": false }) } @@ -268,6 +267,7 @@ mod job { thread.updated_at.timestamp(), &stage_one_output.raw_memory, &stage_one_output.rollout_summary, + stage_one_output.rollout_slug.as_deref(), ) .await, token_usage, @@ -348,6 +348,7 @@ mod job { let mut output: StageOneOutput = serde_json::from_str(&result)?; output.raw_memory = redact_secrets(output.raw_memory); output.rollout_summary = redact_secrets(output.rollout_summary); + output.rollout_slug = output.rollout_slug.map(redact_secrets); Ok((output, token_usage)) } @@ -401,6 +402,7 @@ mod job { source_updated_at: i64, raw_memory: &str, rollout_summary: &str, + rollout_slug: Option<&str>, ) -> JobOutcome { let Some(state_db) = session.services.state_db.as_deref() else { return JobOutcome::Failed; @@ -413,6 +415,7 @@ mod job { source_updated_at, raw_memory, rollout_summary, + rollout_slug, ) .await .unwrap_or(false) diff --git a/codex-rs/core/src/memories/storage.rs b/codex-rs/core/src/memories/storage.rs index be888d367..a596ea6f0 100644 --- a/codex-rs/core/src/memories/storage.rs +++ b/codex-rs/core/src/memories/storage.rs @@ -34,7 +34,7 @@ pub(super) async fn sync_rollout_summaries_from_memories( .collect::>(); let keep = retained .iter() - .map(|memory| memory.thread_id.to_string()) + .map(|memory| rollout_summary_file_stem(memory)) .collect::>(); prune_rollout_summaries(root, &keep).await?; @@ -113,10 +113,10 @@ async fn prune_rollout_summaries(root: &Path, keep: &BTreeSet) -> std::i let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { continue; }; - let Some(thread_id) = extract_thread_id_from_rollout_summary_filename(file_name) else { + let Some(stem) = file_name.strip_suffix(".md") else { continue; }; - if !keep.contains(thread_id) + if !keep.contains(stem) && let Err(err) = tokio::fs::remove_file(&path).await && err.kind() != std::io::ErrorKind::NotFound { @@ -134,7 +134,8 @@ async fn write_rollout_summary_for_thread( root: &Path, memory: &Stage1Output, ) -> std::io::Result<()> { - let path = rollout_summaries_dir(root).join(format!("{}.md", memory.thread_id)); + let file_stem = rollout_summary_file_stem(memory); + let path = rollout_summaries_dir(root).join(format!("{file_stem}.md")); let mut body = String::new(); writeln!(body, "thread_id: {}", memory.thread_id) @@ -155,7 +156,85 @@ async fn write_rollout_summary_for_thread( tokio::fs::write(path, body).await } -fn extract_thread_id_from_rollout_summary_filename(file_name: &str) -> Option<&str> { - let stem = file_name.strip_suffix(".md")?; - if stem.is_empty() { None } else { Some(stem) } +fn rollout_summary_file_stem(memory: &Stage1Output) -> String { + const ROLLOUT_SLUG_MAX_LEN: usize = 20; + + let thread_id = memory.thread_id.to_string(); + let Some(raw_slug) = memory.rollout_slug.as_deref() else { + return thread_id; + }; + + let mut slug = String::with_capacity(ROLLOUT_SLUG_MAX_LEN); + for ch in raw_slug.chars() { + if slug.len() >= ROLLOUT_SLUG_MAX_LEN { + break; + } + + if ch.is_ascii_alphanumeric() { + slug.push(ch.to_ascii_lowercase()); + } else { + slug.push('_'); + } + } + + while slug.ends_with('_') { + slug.pop(); + } + + if slug.is_empty() { + thread_id + } else { + format!("{thread_id}-{slug}") + } +} + +#[cfg(test)] +mod tests { + use super::rollout_summary_file_stem; + use chrono::TimeZone; + use chrono::Utc; + use codex_protocol::ThreadId; + use codex_state::Stage1Output; + use pretty_assertions::assert_eq; + use std::path::PathBuf; + + fn stage1_output_with_slug(rollout_slug: Option<&str>) -> Stage1Output { + Stage1Output { + thread_id: ThreadId::new(), + source_updated_at: Utc.timestamp_opt(123, 0).single().expect("timestamp"), + raw_memory: "raw memory".to_string(), + rollout_summary: "summary".to_string(), + rollout_slug: rollout_slug.map(ToString::to_string), + cwd: PathBuf::from("/tmp/workspace"), + generated_at: Utc.timestamp_opt(124, 0).single().expect("timestamp"), + } + } + + #[test] + fn rollout_summary_file_stem_uses_thread_id_when_slug_missing() { + let memory = stage1_output_with_slug(None); + let thread_id = memory.thread_id.to_string(); + + assert_eq!(rollout_summary_file_stem(&memory), thread_id); + } + + #[test] + fn rollout_summary_file_stem_sanitizes_and_truncates_slug() { + let memory = + stage1_output_with_slug(Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345")); + let thread_id = memory.thread_id.to_string(); + + assert_eq!( + rollout_summary_file_stem(&memory), + format!("{thread_id}-unsafe_slug_with_spa") + ); + } + + #[test] + fn rollout_summary_file_stem_uses_thread_id_when_slug_is_empty() { + let memory = stage1_output_with_slug(Some("")); + let thread_id = memory.thread_id.to_string(); + + assert_eq!(rollout_summary_file_stem(&memory), thread_id); + } } diff --git a/codex-rs/core/src/memories/tests.rs b/codex-rs/core/src/memories/tests.rs index 5e0068565..900cc2184 100644 --- a/codex-rs/core/src/memories/tests.rs +++ b/codex-rs/core/src/memories/tests.rs @@ -22,7 +22,7 @@ fn memory_root_uses_shared_global_path() { } #[test] -fn stage_one_output_schema_requires_all_declared_properties() { +fn stage_one_output_schema_keeps_rollout_slug_optional() { let schema = crate::memories::phase1::output_schema(); let properties = schema .get("properties") @@ -33,16 +33,17 @@ fn stage_one_output_schema_requires_all_declared_properties() { .and_then(Value::as_array) .expect("required array"); - let mut property_keys = properties.keys().map(String::as_str).collect::>(); - property_keys.sort_unstable(); - let mut required_keys = required .iter() .map(|key| key.as_str().expect("required key string")) .collect::>(); required_keys.sort_unstable(); - assert_eq!(required_keys, property_keys); + assert!( + properties.contains_key("rollout_slug"), + "schema should declare rollout_slug" + ); + assert_eq!(required_keys, vec!["raw_memory", "rollout_summary"]); } #[tokio::test] @@ -67,6 +68,7 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"), raw_memory: "raw memory".to_string(), rollout_summary: "short summary".to_string(), + rollout_slug: None, cwd: PathBuf::from("/tmp/workspace"), generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"), }]; @@ -97,6 +99,83 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only assert!(raw_memories.contains("cwd: /tmp/workspace")); } +#[tokio::test] +async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() { + let dir = tempdir().expect("tempdir"); + let root = dir.path().join("memory"); + ensure_layout(&root).await.expect("ensure layout"); + + let thread_id = ThreadId::new(); + let stale_unslugged_path = rollout_summaries_dir(&root).join(format!("{thread_id}.md")); + let stale_old_slug_path = + rollout_summaries_dir(&root).join(format!("{thread_id}--old-slug.md")); + tokio::fs::write(&stale_unslugged_path, "stale") + .await + .expect("write stale unslugged file"); + tokio::fs::write(&stale_old_slug_path, "stale") + .await + .expect("write stale old-slug file"); + + let memories = vec![Stage1Output { + thread_id, + source_updated_at: Utc.timestamp_opt(200, 0).single().expect("timestamp"), + raw_memory: "raw memory".to_string(), + rollout_summary: "short summary".to_string(), + rollout_slug: Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345".to_string()), + cwd: PathBuf::from("/tmp/workspace"), + generated_at: Utc.timestamp_opt(201, 0).single().expect("timestamp"), + }]; + + sync_rollout_summaries_from_memories( + &root, + &memories, + DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL, + ) + .await + .expect("sync rollout summaries"); + + let mut dir = tokio::fs::read_dir(rollout_summaries_dir(&root)) + .await + .expect("open rollout summaries dir"); + let mut files = Vec::new(); + while let Some(entry) = dir.next_entry().await.expect("read dir entry") { + files.push(entry.file_name().to_string_lossy().to_string()); + } + files.sort_unstable(); + + assert_eq!(files.len(), 1); + let file_name = &files[0]; + let stem = file_name + .strip_suffix(".md") + .expect("rollout summary file should end with .md"); + let slug = stem + .strip_prefix(&format!("{thread_id}-")) + .expect("rollout summary filename should include thread id and slug"); + assert!(slug.len() <= 20, "slug should be capped at 20 chars"); + assert!( + slug.chars() + .all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_'), + "slug should be file-safe lowercase ascii with underscores" + ); + + let summary = tokio::fs::read_to_string(rollout_summaries_dir(&root).join(file_name)) + .await + .expect("read rollout summary"); + assert!(summary.contains(&format!("thread_id: {thread_id}"))); + assert!( + !tokio::fs::try_exists(&stale_unslugged_path) + .await + .expect("check stale unslugged path"), + "slugged sync should prune stale unslugged filename for same thread" + ); + assert!( + !tokio::fs::try_exists(&stale_old_slug_path) + .await + .expect("check stale old slug path"), + "slugged sync should prune stale slugged filename for same thread" + ); +} + mod phase2 { use crate::CodexAuth; use crate::ThreadManager; @@ -130,6 +209,7 @@ mod phase2 { .expect("valid source_updated_at timestamp"), raw_memory: "raw memory".to_string(), rollout_summary: "rollout summary".to_string(), + rollout_slug: None, cwd: PathBuf::from("/tmp/workspace"), generated_at: chrono::DateTime::::from_timestamp(source_updated_at + 1, 0) .expect("valid generated_at timestamp"), @@ -220,6 +300,7 @@ mod phase2 { source_updated_at, "raw memory", "rollout summary", + None, ) .await .expect("mark stage-1 success"), @@ -572,6 +653,7 @@ mod phase2 { 100, "raw memory", "rollout summary", + None, ) .await .expect("mark stage-1 success"), diff --git a/codex-rs/state/migrations/0009_stage1_outputs_rollout_slug.sql b/codex-rs/state/migrations/0009_stage1_outputs_rollout_slug.sql new file mode 100644 index 000000000..9b3a1e077 --- /dev/null +++ b/codex-rs/state/migrations/0009_stage1_outputs_rollout_slug.sql @@ -0,0 +1,2 @@ +ALTER TABLE stage1_outputs +ADD COLUMN rollout_slug TEXT; diff --git a/codex-rs/state/src/model/memories.rs b/codex-rs/state/src/model/memories.rs index aff702d67..813c99939 100644 --- a/codex-rs/state/src/model/memories.rs +++ b/codex-rs/state/src/model/memories.rs @@ -15,6 +15,7 @@ pub struct Stage1Output { pub source_updated_at: DateTime, pub raw_memory: String, pub rollout_summary: String, + pub rollout_slug: Option, pub cwd: PathBuf, pub generated_at: DateTime, } @@ -25,6 +26,7 @@ pub(crate) struct Stage1OutputRow { source_updated_at: i64, raw_memory: String, rollout_summary: String, + rollout_slug: Option, cwd: String, generated_at: i64, } @@ -36,6 +38,7 @@ impl Stage1OutputRow { source_updated_at: row.try_get("source_updated_at")?, raw_memory: row.try_get("raw_memory")?, rollout_summary: row.try_get("rollout_summary")?, + rollout_slug: row.try_get("rollout_slug")?, cwd: row.try_get("cwd")?, generated_at: row.try_get("generated_at")?, }) @@ -51,6 +54,7 @@ impl TryFrom for Stage1Output { source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?, raw_memory: row.raw_memory, rollout_summary: row.rollout_summary, + rollout_slug: row.rollout_slug, cwd: PathBuf::from(row.cwd), generated_at: epoch_seconds_to_datetime(row.generated_at)?, }) diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 77930f800..4cc9c63b1 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -1143,7 +1143,14 @@ WHERE id = 1 assert!( runtime - .mark_stage1_job_succeeded(thread_id, ownership_token.as_str(), 100, "raw", "sum") + .mark_stage1_job_succeeded( + thread_id, + ownership_token.as_str(), + 100, + "raw", + "sum", + None, + ) .await .expect("mark stage1 succeeded"), "stage1 success should finalize for current token" @@ -1492,6 +1499,7 @@ WHERE id = 1 up_to_date.updated_at.timestamp(), "raw", "summary", + None, ) .await .expect("mark up-to-date thread succeeded"), @@ -1715,6 +1723,7 @@ WHERE kind = 'memory_stage1' claim.thread.updated_at.timestamp(), "raw", "summary", + None, ) .await .expect("mark first-batch stage1 success"), @@ -1766,7 +1775,14 @@ WHERE kind = 'memory_stage1' }; assert!( runtime - .mark_stage1_job_succeeded(thread_id, ownership_token.as_str(), 100, "raw", "sum") + .mark_stage1_job_succeeded( + thread_id, + ownership_token.as_str(), + 100, + "raw", + "sum", + None, + ) .await .expect("mark stage1 succeeded"), "mark stage1 succeeded should write stage1_outputs" @@ -2058,6 +2074,7 @@ WHERE kind = 'memory_stage1' 100, "raw memory a", "summary a", + None, ) .await .expect("mark stage1 succeeded a"), @@ -2080,6 +2097,7 @@ WHERE kind = 'memory_stage1' 101, "raw memory b", "summary b", + Some("rollout-b"), ) .await .expect("mark stage1 succeeded b"), @@ -2093,9 +2111,11 @@ WHERE kind = 'memory_stage1' assert_eq!(outputs.len(), 2); assert_eq!(outputs[0].thread_id, thread_id_b); assert_eq!(outputs[0].rollout_summary, "summary b"); + assert_eq!(outputs[0].rollout_slug.as_deref(), Some("rollout-b")); assert_eq!(outputs[0].cwd, codex_home.join("workspace-b")); assert_eq!(outputs[1].thread_id, thread_id_a); assert_eq!(outputs[1].rollout_summary, "summary a"); + assert_eq!(outputs[1].rollout_slug, None); assert_eq!(outputs[1].cwd, codex_home.join("workspace-a")); let _ = tokio::fs::remove_dir_all(codex_home).await; @@ -2208,7 +2228,14 @@ VALUES (?, ?, ?, ?, ?) }; assert!( runtime - .mark_stage1_job_succeeded(thread_a, token_a.as_str(), 100, "raw-a", "summary-a") + .mark_stage1_job_succeeded( + thread_a, + token_a.as_str(), + 100, + "raw-a", + "summary-a", + None, + ) .await .expect("mark stage1 succeeded a"), "stage1 success should persist output for thread a" @@ -2224,7 +2251,14 @@ VALUES (?, ?, ?, ?, ?) }; assert!( runtime - .mark_stage1_job_succeeded(thread_b, token_b.as_str(), 101, "raw-b", "summary-b") + .mark_stage1_job_succeeded( + thread_b, + token_b.as_str(), + 101, + "raw-b", + "summary-b", + None, + ) .await .expect("mark stage1 succeeded b"), "stage1 success should persist output for thread b" diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 4d1f6ffd5..c2a7f1fe9 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -191,7 +191,13 @@ LEFT JOIN jobs let rows = sqlx::query( r#" -SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.rollout_summary, so.generated_at +SELECT + so.thread_id, + so.source_updated_at, + so.raw_memory, + so.rollout_summary, + so.rollout_slug, + so.generated_at , COALESCE(t.cwd, '') AS cwd FROM stage1_outputs AS so LEFT JOIN threads AS t @@ -407,6 +413,7 @@ WHERE kind = ? AND job_key = ? /// - sets `status='done'` and `last_success_watermark = input_watermark` /// - upserts `stage1_outputs` for the thread, replacing existing output only /// when `source_updated_at` is newer or equal + /// - persists optional `rollout_slug` for rollout summary artifact naming /// - enqueues/advances the global phase-2 job watermark using /// `source_updated_at` pub async fn mark_stage1_job_succeeded( @@ -416,6 +423,7 @@ WHERE kind = ? AND job_key = ? source_updated_at: i64, raw_memory: &str, rollout_summary: &str, + rollout_slug: Option<&str>, ) -> anyhow::Result { let now = Utc::now().timestamp(); let thread_id = thread_id.to_string(); @@ -454,12 +462,14 @@ INSERT INTO stage1_outputs ( source_updated_at, raw_memory, rollout_summary, + rollout_slug, generated_at -) VALUES (?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(thread_id) DO UPDATE SET source_updated_at = excluded.source_updated_at, raw_memory = excluded.raw_memory, rollout_summary = excluded.rollout_summary, + rollout_slug = excluded.rollout_slug, generated_at = excluded.generated_at WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at "#, @@ -468,6 +478,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at .bind(source_updated_at) .bind(raw_memory) .bind(rollout_summary) + .bind(rollout_slug) .bind(now) .execute(&mut *tx) .await?;