feat: use git-backed workspace diffs for memory consolidation (#18982)

## Why

This PR make the `morpheus` agent (memory phase 2) use a git diff to
start it's consolidation. The workflow is the following:
1. The agent acquire a lock
2. If `.codex/memories` does not exist or is not a git root, initialize
everything (and make a first empty commit)
3. Update `raw_memories.md` and `rollout_summaries/` as before.
Basically we select max N phase 1 memories based on a given policy
4. We use git (`gix`) to get a diff between the current state of
`.codex/memories` and the last commit.
5. Dump the diff in `phase2_workspace_diff.md`
6. Spawn `morpheus` and point it to `phase2_workspace_diff.md`
7. Wait for `morpheus` to be done
8. Re-create a new `.git` and make one single commit on it. We do this
because we don't want to preserve history through `.git` and this is
cheap anyway
9. We release the lock
On top of this, we keep the retry policies etc etc

The goals of this new workflow are:
* Better support of any memory extensions such as `chronicle`
* Allow the user to manually edit memories and this will be considered
by the phase 2 agent
 
As a follow-up we will need to add support for user's edition while
`morpheus` is running

## What Changed

- Added memory workspace helpers that prepare the git baseline, compute
the diff, write `phase2_workspace_diff.md`, and reset the baseline after
successful consolidation.
- Updated Phase 2 to sync current inputs into `raw_memories.md` and
`rollout_summaries/`, prune old extension resources, skip clean
workspaces, and run the consolidation subagent only when the workspace
has changes.
- Tightened Phase 2 job ownership around long-running consolidation with
heartbeats and an ownership check before resetting the baseline.
- Simplified the prompt and state APIs so DB watermarks are bookkeeping,
while workspace dirtiness decides whether consolidation work exists.
- Updated the memory pipeline README and tests for workspace diffs,
extension-resource cleanup, pollution-driven forgetting, selection
ranking, and baseline persistence.

## Verification

- Added/updated coverage in `core/src/memories/tests.rs`,
`core/src/memories/workspace_tests.rs`, `state/src/runtime/memories.rs`,
and `core/tests/suite/memories.rs`.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
jif-oai
2026-04-27 14:32:44 +02:00
committed by GitHub
Unverified
parent f8c527e529
commit 01ab25dbb5
21 changed files with 1079 additions and 1058 deletions
+33 -27
View File
@@ -70,7 +70,8 @@ Phase 2 consolidates the latest stage-1 outputs into the filesystem memory artif
What it does:
- claims a single global phase-2 job (so only one consolidation runs at a time)
- claims a single global phase-2 lock before touching the memories root (so only one consolidation
inspects or mutates the workspace at a time)
- loads a bounded set of stage-1 outputs from the state DB using phase-2
selection rules:
- ignores memories whose `last_usage` falls outside the configured
@@ -82,53 +83,58 @@ What it does:
- computes a completion watermark from the claimed watermark + newest input timestamps
- syncs local memory artifacts under the memories root:
- `raw_memories.md` (merged raw memories, latest first)
- `rollout_summaries/` (one summary file per retained rollout)
- prunes stale rollout summaries that are no longer retained
- finds old resource files from memory extensions under
`memories_extensions/<extension>/resources/` for extension directories that
have an `instructions.md`, using the memory module retention window
- if there are no Phase 1 inputs or old extension resources, marks the job
successful and exits
- `rollout_summaries/` (one summary file per selected rollout)
- keeps the memories root itself as a git-baseline directory, initialized under
`~/.codex/memories/.git` by `codex-git-utils`
- prunes stale rollout summaries that are no longer selected
- prunes memory extension resource files older than the extension retention
window, so cleanup appears in the workspace diff
- writes `phase2_workspace_diff.md` in the memories root with the git-style diff
from the previous successful Phase 2 baseline to the current worktree
- if the memory workspace has no changes after artifact sync/pruning, marks the
job successful and exits
If there is input, it then:
If the memory workspace has changes, it then:
- spawns an internal consolidation sub-agent
- builds the Phase 2 prompt with a diff of the current Phase 1 input
selection versus the last successful Phase 2 selection (`added`,
`retained`, `removed`)
- includes old extension resource paths in the prompt diff
- builds the Phase 2 prompt with the path to the generated workspace diff
- points the agent at `phase2_workspace_diff.md` for the detailed diff context
- runs it with no approvals, no network, and local write access only
- disables collab for that agent (to prevent recursive delegation)
- watches the agent status and heartbeats the global job lease while it runs
- resets the memory git baseline after the agent completes successfully; the
generated diff file is removed before this reset so deleted content is not
kept in the prompt artifact or unreachable git objects
- marks the phase-2 job success/failure in the state DB when the agent finishes
- prunes old extension resource files after the consolidation agent completes
and the successful Phase 2 job is recorded
Selection diff behavior:
Selection and workspace-diff behavior:
- successful Phase 2 runs mark the exact stage-1 snapshots they consumed with
`selected_for_phase2 = 1` and persist the matching
`selected_for_phase2_source_updated_at`
- Phase 1 upserts preserve the previous `selected_for_phase2` baseline until
the next successful Phase 2 run rewrites it
- the next Phase 2 run compares the current top-N stage-1 inputs against that
prior snapshot selection to label inputs as `added` or `retained`; a
refreshed thread stays `added` until Phase 2 successfully selects its newer
snapshot
- rows that were previously selected but still exist outside the current top-N
selection are surfaced as `removed`
- before the agent starts, local `rollout_summaries/` and `raw_memories.md`
keep the union of the current selection and the previous successful
selection, so removed-thread evidence stays available during forgetting
- Phase 2 loads only the current top-N selected stage-1 inputs, syncs
`rollout_summaries/` and `raw_memories.md` directly to that selection, then
lets the git-style workspace diff surface additions, modifications, and
deletions against the previous successful memory baseline
- when the selected input set is empty, stale `rollout_summaries/` files are
removed and `raw_memories.md` is rewritten to the empty-input placeholder;
consolidated outputs such as `MEMORY.md`, `memory_summary.md`, and `skills/`
are left for the agent to update
Watermark behavior:
- The global phase-2 job claim includes an input watermark representing the latest input timestamp known when the job was claimed.
- The global phase-2 lock does not use DB watermarks as a dirty check; git
workspace dirtiness decides whether an agent needs to run.
- The global phase-2 job row still tracks an input watermark as bookkeeping
for the latest DB input timestamp known when the job was claimed.
- Phase 2 recomputes a `new_watermark` using the max of:
- the claimed watermark
- the newest `source_updated_at` timestamp in the stage-1 inputs it actually loaded
- On success, Phase 2 stores that completion watermark in the DB.
- This lets later phase-2 runs know whether new stage-1 data arrived since the last successful consolidation (dirty vs not dirty), while also avoiding moving the watermark backwards.
- This avoids moving the recorded completion watermark backwards, but does not
decide whether Phase 2 has work.
In practice, this phase is responsible for refreshing the on-disk memory workspace and producing/updating the higher-level consolidated memory outputs.
+19 -169
View File
@@ -4,46 +4,27 @@ use chrono::Duration;
use chrono::NaiveDateTime;
use chrono::Utc;
use std::path::Path;
use std::path::PathBuf;
use tracing::warn;
const FILENAME_TS_FORMAT: &str = "%Y-%m-%dT%H-%M-%S";
pub(super) const EXTENSION_RESOURCE_RETENTION_DAYS: i64 = 7;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct RemovedExtensionResource {
pub(super) extension: String,
pub(super) resource_path: String,
pub(super) async fn prune_old_extension_resources(memory_root: &Path) {
prune_old_extension_resources_with_now(memory_root, Utc::now()).await
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct PendingExtensionResourceRemoval {
pub(super) removed: RemovedExtensionResource,
path: PathBuf,
}
pub(super) async fn find_old_extension_resources(
memory_root: &Path,
) -> Vec<PendingExtensionResourceRemoval> {
find_old_extension_resources_with_now(memory_root, Utc::now()).await
}
async fn find_old_extension_resources_with_now(
memory_root: &Path,
now: DateTime<Utc>,
) -> Vec<PendingExtensionResourceRemoval> {
let mut pending = Vec::new();
async fn prune_old_extension_resources_with_now(memory_root: &Path, now: DateTime<Utc>) {
let cutoff = now - Duration::days(EXTENSION_RESOURCE_RETENTION_DAYS);
let extensions_root = memory_extensions_root(memory_root);
let mut extensions = match tokio::fs::read_dir(&extensions_root).await {
Ok(extensions) => extensions,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return pending,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return,
Err(err) => {
warn!(
"failed reading memory extensions root {}: {err}",
extensions_root.display()
);
return pending;
return;
}
};
@@ -52,19 +33,10 @@ async fn find_old_extension_resources_with_now(
let Ok(file_type) = extension_entry.file_type().await else {
continue;
};
if !file_type.is_dir() {
continue;
}
let Some(extension) = extension_path
.file_name()
.and_then(|name| name.to_str())
.map(ToOwned::to_owned)
else {
continue;
};
if !tokio::fs::try_exists(extension_path.join("instructions.md"))
.await
.unwrap_or(false)
if !file_type.is_dir()
|| !tokio::fs::try_exists(extension_path.join("instructions.md"))
.await
.unwrap_or(false)
{
continue;
}
@@ -106,34 +78,14 @@ async fn find_old_extension_resources_with_now(
continue;
}
pending.push(PendingExtensionResourceRemoval {
removed: RemovedExtensionResource {
extension: extension.clone(),
resource_path: format!("resources/{file_name}"),
},
path: resource_file_path,
});
}
}
pending.sort_by(|left, right| {
left.removed
.extension
.cmp(&right.removed.extension)
.then_with(|| left.removed.resource_path.cmp(&right.removed.resource_path))
});
pending
}
pub(super) async fn remove_extension_resources(resources: &[PendingExtensionResourceRemoval]) {
for resource in resources {
if let Err(err) = tokio::fs::remove_file(&resource.path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
warn!(
"failed pruning old memory extension resource {}: {err}",
resource.path.display()
);
if let Err(err) = tokio::fs::remove_file(&resource_file_path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
warn!(
"failed pruning old memory extension resource {}: {err}",
resource_file_path.display()
);
}
}
}
}
@@ -145,107 +97,5 @@ fn resource_timestamp(file_name: &str) -> Option<DateTime<Utc>> {
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
#[tokio::test]
async fn finds_only_old_resources_from_extensions_with_instructions() {
let codex_home = TempDir::new().expect("create temp codex home");
let memory_root = codex_home.path().join("memories");
let extensions_root = memory_extensions_root(&memory_root);
let chronicle_resources = extensions_root.join("chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources)
.await
.expect("create chronicle resources");
tokio::fs::write(
extensions_root.join("chronicle/instructions.md"),
"instructions",
)
.await
.expect("write chronicle instructions");
let now = DateTime::from_naive_utc_and_offset(
NaiveDateTime::parse_from_str("2026-04-14T12-00-00", FILENAME_TS_FORMAT)
.expect("parse now"),
Utc,
);
let old_file = chronicle_resources.join("2026-04-06T11-59-59-abcd-10min-old.md");
let exact_cutoff_file =
chronicle_resources.join("2026-04-07T12-00-00-abcd-10min-cutoff.md");
let recent_file = chronicle_resources.join("2026-04-08T12-00-00-abcd-10min-recent.md");
let invalid_file = chronicle_resources.join("not-a-timestamp.md");
for file in [&old_file, &exact_cutoff_file, &recent_file, &invalid_file] {
tokio::fs::write(file, "resource")
.await
.expect("write chronicle resource");
}
let ignored_resources = extensions_root.join("ignored/resources");
tokio::fs::create_dir_all(&ignored_resources)
.await
.expect("create ignored resources");
let ignored_old_file = ignored_resources.join("2026-04-06T11-59-59-abcd-10min-old.md");
tokio::fs::write(&ignored_old_file, "ignored")
.await
.expect("write ignored resource");
let pending = find_old_extension_resources_with_now(&memory_root, now).await;
assert_eq!(
pending
.iter()
.map(|resource| resource.removed.clone())
.collect::<Vec<_>>(),
vec![
RemovedExtensionResource {
extension: "chronicle".to_string(),
resource_path: "resources/2026-04-06T11-59-59-abcd-10min-old.md".to_string(),
},
RemovedExtensionResource {
extension: "chronicle".to_string(),
resource_path: "resources/2026-04-07T12-00-00-abcd-10min-cutoff.md".to_string(),
},
]
);
assert!(
tokio::fs::try_exists(&old_file)
.await
.expect("check old file before remove")
);
assert!(
tokio::fs::try_exists(&exact_cutoff_file)
.await
.expect("check cutoff file before remove")
);
remove_extension_resources(&pending).await;
assert!(
!tokio::fs::try_exists(&old_file)
.await
.expect("check old file")
);
assert!(
!tokio::fs::try_exists(&exact_cutoff_file)
.await
.expect("check cutoff file")
);
assert!(
tokio::fs::try_exists(&recent_file)
.await
.expect("check recent file")
);
assert!(
tokio::fs::try_exists(&invalid_file)
.await
.expect("check invalid file")
);
assert!(
tokio::fs::try_exists(&ignored_old_file)
.await
.expect("check ignored old file")
);
}
}
#[path = "extensions_tests.rs"]
mod tests;
@@ -0,0 +1,81 @@
use super::*;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
#[tokio::test]
async fn prunes_only_old_resources_from_extensions_with_instructions() {
let codex_home = TempDir::new().expect("create temp codex home");
let memory_root = codex_home.path().join("memories");
let extensions_root = memory_extensions_root(&memory_root);
let chronicle_resources = extensions_root.join("chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources)
.await
.expect("create chronicle resources");
tokio::fs::write(
extensions_root.join("chronicle/instructions.md"),
"instructions",
)
.await
.expect("write chronicle instructions");
let now = DateTime::from_naive_utc_and_offset(
NaiveDateTime::parse_from_str("2026-04-14T12-00-00", FILENAME_TS_FORMAT)
.expect("parse now"),
Utc,
);
let old_file = chronicle_resources.join("2026-04-06T11-59-59-abcd-10min-old.md");
let exact_cutoff_file = chronicle_resources.join("2026-04-07T12-00-00-abcd-10min-cutoff.md");
let recent_file = chronicle_resources.join("2026-04-08T12-00-00-abcd-10min-recent.md");
let invalid_file = chronicle_resources.join("not-a-timestamp.md");
for file in [&old_file, &exact_cutoff_file, &recent_file, &invalid_file] {
tokio::fs::write(file, "resource")
.await
.expect("write chronicle resource");
}
let ignored_resources = extensions_root.join("ignored/resources");
tokio::fs::create_dir_all(&ignored_resources)
.await
.expect("create ignored resources");
let ignored_old_file = ignored_resources.join("2026-04-06T11-59-59-abcd-10min-old.md");
tokio::fs::write(&ignored_old_file, "ignored")
.await
.expect("write ignored resource");
prune_old_extension_resources_with_now(&memory_root, now).await;
assert!(
!tokio::fs::try_exists(&old_file)
.await
.expect("check old file")
);
assert!(
!tokio::fs::try_exists(&exact_cutoff_file)
.await
.expect("check cutoff file")
);
assert!(
tokio::fs::try_exists(&recent_file)
.await
.expect("check recent file")
);
assert!(
tokio::fs::try_exists(&invalid_file)
.await
.expect("check invalid file")
);
assert!(
tokio::fs::try_exists(&ignored_old_file)
.await
.expect("check ignored file")
);
}
#[test]
fn parses_timestamp_prefix_from_resource_file_name() {
let parsed = resource_timestamp("2026-04-06T11-59-59-abcd-10min-old.md")
.expect("timestamp should parse");
assert_eq!(parsed.timestamp(), 1_775_476_799);
assert!(resource_timestamp("not-a-timestamp.md").is_none());
}
+4 -4
View File
@@ -6,6 +6,7 @@
pub(crate) mod citations;
mod control;
mod extensions;
mod phase1;
mod phase2;
pub(crate) mod prompts;
@@ -14,6 +15,7 @@ mod storage;
#[cfg(test)]
mod tests;
pub(crate) mod usage;
mod workspace;
use codex_protocol::openai_models::ReasoningEffort;
@@ -25,13 +27,11 @@ pub use control::clear_memory_roots_contents;
pub(crate) use start::start_memories_startup_task;
mod artifacts {
pub(super) const EXTENSIONS_SUBDIR: &str = "memories_extensions";
pub(super) const EXTENSIONS_SUBDIR: &str = "extensions";
pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
pub(super) const RAW_MEMORIES_FILENAME: &str = "raw_memories.md";
}
mod extensions;
/// Phase 1 (startup extraction).
mod phase_one {
/// Default model used for phase 1.
@@ -111,7 +111,7 @@ fn rollout_summaries_dir(root: &Path) -> PathBuf {
}
fn memory_extensions_root(root: &Path) -> PathBuf {
root.with_file_name(artifacts::EXTENSIONS_SUBDIR)
root.join(artifacts::EXTENSIONS_SUBDIR)
}
fn raw_memories_file(root: &Path) -> PathBuf {
+100 -83
View File
@@ -1,16 +1,17 @@
use crate::agent::AgentStatus;
use crate::agent::status::is_final as is_final_agent_status;
use crate::config::Config;
use crate::memories::extensions::PendingExtensionResourceRemoval;
use crate::memories::extensions::find_old_extension_resources;
use crate::memories::extensions::remove_extension_resources;
use crate::memories::extensions::prune_old_extension_resources;
use crate::memories::memory_root;
use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::rollout_summary_file_stem;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use crate::memories::workspace::memory_workspace_diff;
use crate::memories::workspace::prepare_memory_workspace;
use crate::memories::workspace::reset_memory_workspace_baseline;
use crate::memories::workspace::write_workspace_diff;
use crate::session::emit_subagent_session_started;
use crate::session::session::Session;
use codex_config::Constrained;
@@ -25,7 +26,7 @@ use codex_protocol::user_input::UserInput;
use codex_state::Stage1Output;
use codex_state::StateRuntime;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
@@ -59,7 +60,7 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
let max_raw_memories = config.memories.max_raw_memories_for_consolidation;
let max_unused_days = config.memories.max_unused_days;
// 1. Claim the job.
// 1. Claim the global Phase 2 lock before touching the memory workspace.
let claim = match job::claim(session, db).await {
Ok(claim) => claim,
Err(e) => {
@@ -72,71 +73,76 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
}
};
// 2. Get the config for the agent
let Some(agent_config) = agent::get_config(config.clone()) else {
// 2. Ensure the memories root has a git baseline repository.
if let Err(err) = prepare_memory_workspace(&root).await {
tracing::error!("failed preparing memory workspace: {err}");
job::failed(session, db, &claim, "failed_prepare_workspace").await;
return;
}
// 3. Build the locked-down config used by the consolidation agent.
let Some(agent_config) = agent::get_config(config.as_ref()) else {
// If we can't get the config, we can't consolidate.
tracing::error!("failed to get agent config");
job::failed(session, db, &claim, "failed_sandbox_policy").await;
return;
};
// 3. Query the memories
let selection = match db
// 4. Load current DB-backed Phase 2 inputs.
let raw_memories = match db
.get_phase2_input_selection(max_raw_memories, max_unused_days)
.await
{
Ok(selection) => selection,
Ok(raw_memories) => raw_memories,
Err(err) => {
tracing::error!("failed to list stage1 outputs from global: {}", err);
tracing::error!("failed to list stage1 outputs from global: {err}");
job::failed(session, db, &claim, "failed_load_stage1_outputs").await;
return;
}
};
let raw_memories = selection.selected.to_vec();
let artifact_memories = artifact_memories_for_phase2(&selection);
let raw_memory_count = raw_memories.len();
let new_watermark = get_watermark(claim.watermark, &raw_memories);
// 4. Update the file system by syncing the raw memories with the one extracted from DB at
// step 3
// [`rollout_summaries/`]
if let Err(err) =
sync_rollout_summaries_from_memories(&root, &artifact_memories, artifact_memories.len())
.await
{
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
job::failed(session, db, &claim, "failed_sync_artifacts").await;
// 5. Sync the current inputs into the memory workspace.
if let Err(err) = sync_phase2_workspace_inputs(&root, &raw_memories).await {
tracing::error!("failed syncing phase2 workspace inputs: {err}");
job::failed(session, db, &claim, "failed_sync_workspace_inputs").await;
return;
}
// [`raw_memories.md`]
if let Err(err) =
rebuild_raw_memories_file_from_memories(&root, &artifact_memories, artifact_memories.len())
.await
{
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
job::failed(session, db, &claim, "failed_rebuild_raw_memories").await;
return;
}
let pending_extension_resource_removals = find_old_extension_resources(&root).await;
let removed_extension_resources = pending_extension_resource_removals
.iter()
.map(|resource| resource.removed.clone())
.collect::<Vec<_>>();
if raw_memories.is_empty() && pending_extension_resource_removals.is_empty() {
// 6. Use git to decide whether the synced workspace actually changed.
let workspace_diff = match memory_workspace_diff(&root).await {
Ok(diff) => diff,
Err(err) => {
tracing::error!("failed checking memory workspace changes: {err}");
job::failed(session, db, &claim, "failed_workspace_status").await;
return;
}
};
if !workspace_diff.has_changes() {
tracing::error!("Phase 2 no changes");
// We check only after sync of the file system.
job::succeed(
session,
db,
&claim,
new_watermark,
&[],
"succeeded_no_input",
&raw_memories,
"succeeded_no_workspace_changes",
)
.await;
return;
}
// 5. Spawn the agent
let prompt = agent::get_prompt(config, &selection, &removed_extension_resources);
// 7. Persist the diff for the consolidation agent to inspect.
if let Err(err) = write_workspace_diff(&root, &workspace_diff).await {
tracing::error!("failed writing memory workspace diff file: {err}");
job::failed(session, db, &claim, "failed_workspace_diff_file").await;
return;
}
// 8. Spawn the consolidation agent.
let prompt = agent::get_prompt(&root);
let source = SessionSource::SubAgent(SubAgentSource::MemoryConsolidation);
let agent_control = session.services.agent_control.detached_registry();
let thread_id = match agent_control
@@ -172,39 +178,34 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
warn!("failed to load memory consolidation thread config for analytics: {thread_id}");
}
// 6. Spawn the agent handler.
// 9. Hand off completion handling, heartbeats, and baseline reset.
agent::handle(
session,
claim,
new_watermark,
raw_memories.clone(),
pending_extension_resource_removals,
root,
thread_id,
agent_control,
phase_two_e2e_timer,
);
// 7. Metrics and logs.
// 10. Emit dispatch metrics.
let counters = Counters {
input: raw_memories.len() as i64,
input: raw_memory_count as i64,
};
emit_metrics(session, counters);
}
fn artifact_memories_for_phase2(
selection: &codex_state::Phase2InputSelection,
) -> Vec<Stage1Output> {
let mut seen = HashSet::new();
let mut memories = selection.selected.clone();
for memory in &selection.selected {
seen.insert(rollout_summary_file_stem(memory));
}
for memory in &selection.previous_selected {
if seen.insert(rollout_summary_file_stem(memory)) {
memories.push(memory.clone());
}
}
memories
async fn sync_phase2_workspace_inputs(
root: &Path,
raw_memories: &[Stage1Output],
) -> std::io::Result<()> {
let raw_memory_count = raw_memories.len();
sync_rollout_summaries_from_memories(root, raw_memories, raw_memory_count).await?;
rebuild_raw_memories_file_from_memories(root, raw_memories, raw_memory_count).await?;
prune_old_extension_resources(root).await;
Ok(())
}
mod job {
@@ -234,7 +235,9 @@ mod job {
);
(ownership_token, input_watermark)
}
codex_state::Phase2JobClaimOutcome::SkippedNotDirty => return Err("skipped_not_dirty"),
codex_state::Phase2JobClaimOutcome::SkippedRetryUnavailable => {
return Err("skipped_retry_unavailable");
}
codex_state::Phase2JobClaimOutcome::SkippedRunning => return Err("skipped_running"),
};
@@ -293,9 +296,9 @@ mod job {
mod agent {
use super::*;
pub(super) fn get_config(config: Arc<Config>) -> Option<Config> {
pub(super) fn get_config(config: &Config) -> Option<Config> {
let root = memory_root(&config.codex_home);
let mut agent_config = config.as_ref().clone();
let mut agent_config = config.clone();
agent_config.cwd = root.clone();
// Consolidation threads must never feed back into phase-1 memory generation.
@@ -342,13 +345,8 @@ mod agent {
Some(agent_config)
}
pub(super) fn get_prompt(
config: Arc<Config>,
selection: &codex_state::Phase2InputSelection,
removed_extension_resources: &[crate::memories::extensions::RemovedExtensionResource],
) -> Vec<UserInput> {
let root = memory_root(&config.codex_home);
let prompt = build_consolidation_prompt(&root, selection, removed_extension_resources);
pub(super) fn get_prompt(root: &Path) -> Vec<UserInput> {
let prompt = build_consolidation_prompt(root);
vec![UserInput::Text {
text: prompt,
text_elements: vec![],
@@ -362,7 +360,7 @@ mod agent {
claim: Claim,
new_watermark: i64,
selected_outputs: Vec<codex_state::Stage1Output>,
pending_extension_resource_removals: Vec<PendingExtensionResourceRemoval>,
memory_root: codex_utils_absolute_path::AbsolutePathBuf,
thread_id: ThreadId,
agent_control: crate::agent::AgentControl,
phase_two_e2e_timer: Option<codex_otel::Timer>,
@@ -386,20 +384,38 @@ mod agent {
};
// Loop the agent until we have the final status.
let final_status = loop_agent(
db.clone(),
claim.token.clone(),
new_watermark,
thread_id,
rx,
)
.await;
let final_status = loop_agent(db.clone(), claim.token.clone(), thread_id, rx).await;
if matches!(final_status, AgentStatus::Completed(_)) {
if let Some(token_usage) = agent_control.get_total_token_usage(thread_id).await {
emit_token_usage_metrics(&session, &token_usage);
}
if job::succeed(
// Do not reset the workspace baseline if we lost the lock.
let Ok(still_owns_lock) = db
.heartbeat_global_phase2_job(&claim.token, phase_two::JOB_LEASE_SECONDS)
.await
.inspect_err(|err| {
tracing::error!(
"failed confirming global memory consolidation ownership before resetting workspace baseline: {err}"
);
})
else {
job::failed(&session, &db, &claim, "failed_confirm_ownership").await;
return;
};
if !still_owns_lock {
tracing::error!(
"lost global memory consolidation ownership before resetting workspace baseline"
);
return;
}
if let Err(err) = reset_memory_workspace_baseline(&memory_root).await {
tracing::error!("failed resetting memory workspace baseline: {err}");
job::failed(&session, &db, &claim, "failed_workspace_commit").await;
return;
}
if !job::succeed(
&session,
&db,
&claim,
@@ -409,7 +425,9 @@ mod agent {
)
.await
{
remove_extension_resources(&pending_extension_resource_removals).await;
tracing::error!(
"failed marking global memory consolidation job succeeded after resetting workspace baseline"
);
}
} else {
job::failed(&session, &db, &claim, "failed_agent").await;
@@ -433,7 +451,6 @@ mod agent {
async fn loop_agent(
db: Arc<StateRuntime>,
token: String,
_new_watermark: i64,
thread_id: ThreadId,
mut rx: watch::Receiver<AgentStatus>,
) -> AgentStatus {
@@ -491,7 +508,7 @@ pub(super) fn get_watermark(
.map(|memory| memory.source_updated_at.timestamp())
.max()
.unwrap_or(claimed_watermark)
.max(claimed_watermark) // todo double check the claimed here.
.max(claimed_watermark)
}
fn emit_metrics(session: &Arc<Session>, counters: Counters) {
+13 -109
View File
@@ -1,18 +1,12 @@
use crate::memories::extensions::EXTENSION_RESOURCE_RETENTION_DAYS;
use crate::memories::extensions::RemovedExtensionResource;
use crate::memories::memory_extensions_root;
use crate::memories::memory_root;
use crate::memories::phase_one;
use crate::memories::storage::rollout_summary_file_stem_from_parts;
use crate::memories::workspace::WORKSPACE_DIFF_FILENAME;
use codex_protocol::openai_models::ModelInfo;
use codex_state::Phase2InputSelection;
use codex_state::Stage1Output;
use codex_state::Stage1OutputRef;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::truncate_text;
use codex_utils_template::Template;
use std::fmt::Write as _;
use std::path::Path;
use std::sync::LazyLock;
use tokio::fs;
@@ -65,9 +59,9 @@ Memory extensions (under {{ memory_extensions_root }}/):
source.
If the user has any memory extensions, you MUST read the instructions for each extension to
determine how to use the memory source. If the Phase 2 diff lists removed memory extension
resources, use that extension-specific deletion diff to remove stale memories derived only from
those resources. If it has no extension folders, continue with the standard memory inputs only.
determine how to use the memory source. If the workspace diff shows deleted extension resource files,
remove stale memories derived only from those resources. If it has no extension folders, continue
with the standard memory inputs only.
"#;
const MEMORY_EXTENSIONS_PRIMARY_INPUTS: &str = r#"
@@ -78,20 +72,17 @@ Under `{{ memory_extensions_root }}/`:
- If extension folders exist, read each instructions.md first and follow it when interpreting
that extension's memory source.
If the Phase 2 diff lists removed memory extension resources, use that extension-specific deletion
diff to remove stale memories derived only from those resources.
If the workspace diff shows deleted memory extension resources, use that extension-specific deletion
signal to remove stale memories derived only from those resources.
"#;
/// Builds the consolidation subagent prompt for a specific memory root.
pub(super) fn build_consolidation_prompt(
memory_root: &Path,
selection: &Phase2InputSelection,
removed_extension_resources: &[RemovedExtensionResource],
) -> String {
pub(super) fn build_consolidation_prompt(memory_root: &Path) -> String {
let memory_extensions_root = memory_extensions_root(memory_root);
let memory_extensions_exist = memory_extensions_root.is_dir();
let memory_root = memory_root.display().to_string();
let memory_extensions_root = memory_extensions_root.display().to_string();
let phase2_workspace_diff_file = WORKSPACE_DIFF_FILENAME.to_string();
let memory_extensions_folder_structure = if memory_extensions_exist {
render_memory_extensions_block(
&MEMORY_EXTENSIONS_FOLDER_STRUCTURE_TEMPLATE,
@@ -108,8 +99,6 @@ pub(super) fn build_consolidation_prompt(
} else {
String::new()
};
let phase2_input_selection =
render_phase2_input_selection(selection, removed_extension_resources);
CONSOLIDATION_PROMPT_TEMPLATE
.render([
("memory_root", memory_root.as_str()),
@@ -121,12 +110,15 @@ pub(super) fn build_consolidation_prompt(
"memory_extensions_primary_inputs",
memory_extensions_primary_inputs.as_str(),
),
("phase2_input_selection", phase2_input_selection.as_str()),
(
"phase2_workspace_diff_file",
phase2_workspace_diff_file.as_str(),
),
])
.unwrap_or_else(|err| {
warn!("failed to render memories consolidation prompt template: {err}");
format!(
"## Memory Phase 2 (Consolidation)\nConsolidate Codex memories in: {memory_root}\n\n{phase2_input_selection}"
"## Memory Phase 2 (Consolidation)\nConsolidate Codex memories in: {memory_root}\n\nRead {phase2_workspace_diff_file} first."
)
})
}
@@ -140,94 +132,6 @@ fn render_memory_extensions_block(template: &Template, memory_extensions_root: &
})
}
fn render_phase2_input_selection(
selection: &Phase2InputSelection,
removed_extension_resources: &[RemovedExtensionResource],
) -> String {
let retained = selection.retained_thread_ids.len();
let added = selection.selected.len().saturating_sub(retained);
let selected = if selection.selected.is_empty() {
"- none".to_string()
} else {
selection
.selected
.iter()
.map(|item| {
render_selected_input_line(
item,
selection.retained_thread_ids.contains(&item.thread_id),
)
})
.collect::<Vec<_>>()
.join("\n")
};
let removed = if selection.removed.is_empty() {
"- none".to_string()
} else {
selection
.removed
.iter()
.map(render_removed_input_line)
.collect::<Vec<_>>()
.join("\n")
};
let mut rendered = format!(
"- selected inputs this run: {}\n- newly added since the last successful Phase 2 run: {added}\n- retained from the last successful Phase 2 run: {retained}\n- removed from the last successful Phase 2 run: {}\n\nCurrent selected Phase 1 inputs:\n{selected}\n\nRemoved from the last successful Phase 2 selection:\n{removed}\n",
selection.selected.len(),
selection.removed.len(),
);
if !removed_extension_resources.is_empty() {
rendered.push_str("\nMemory extension resources removed by retention pruning:\n");
let _ = writeln!(
rendered,
"- retention window: {EXTENSION_RESOURCE_RETENTION_DAYS} days"
);
let mut current_extension = "";
for removed_resource in removed_extension_resources {
if removed_resource.extension != current_extension {
current_extension = &removed_resource.extension;
let _ = writeln!(rendered, "- extension: {current_extension}");
}
let _ = writeln!(rendered, " - {}", removed_resource.resource_path);
}
}
rendered
}
fn render_selected_input_line(item: &Stage1Output, retained: bool) -> String {
let status = if retained { "retained" } else { "added" };
let rollout_summary_file = format!(
"rollout_summaries/{}.md",
rollout_summary_file_stem_from_parts(
item.thread_id,
item.source_updated_at,
item.rollout_slug.as_deref(),
)
);
format!(
"- [{status}] thread_id={}, rollout_summary_file={rollout_summary_file}",
item.thread_id
)
}
fn render_removed_input_line(item: &Stage1OutputRef) -> String {
let rollout_summary_file = format!(
"rollout_summaries/{}.md",
rollout_summary_file_stem_from_parts(
item.thread_id,
item.source_updated_at,
item.rollout_slug.as_deref(),
)
);
format!(
"- thread_id={}, rollout_summary_file={rollout_summary_file}",
item.thread_id
)
}
/// Builds the stage-1 user message containing rollout metadata and content.
///
/// Large rollout payloads are truncated to 70% of the active model's effective
+11 -25
View File
@@ -1,7 +1,5 @@
use super::*;
use crate::memories::extensions::RemovedExtensionResource;
use codex_models_manager::model_info::model_info_from_slug;
use codex_state::Phase2InputSelection;
use core_test_support::PathExt;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
@@ -58,33 +56,21 @@ fn build_stage_one_input_message_uses_default_limit_when_model_context_window_mi
}
#[test]
fn build_consolidation_prompt_includes_removed_extension_resources() {
fn build_consolidation_prompt_points_to_workspace_diff_and_extension_tree() {
let temp = tempdir().unwrap();
let memory_root = temp.path().join("memories");
std::fs::create_dir_all(temp.path().join("memories_extensions")).unwrap();
let removed_extension_resources = vec![
RemovedExtensionResource {
extension: "chronicle".to_string(),
resource_path: "resources/2026-04-06T11-59-59-abcd-10min-old.md".to_string(),
},
RemovedExtensionResource {
extension: "chronicle".to_string(),
resource_path: "resources/2026-04-07T12-00-00-abcd-10min-cutoff.md".to_string(),
},
];
let memory_extensions_root = memory_root.join("extensions");
std::fs::create_dir_all(&memory_extensions_root).unwrap();
let prompt = build_consolidation_prompt(
&memory_root,
&Phase2InputSelection::default(),
&removed_extension_resources,
);
let prompt = build_consolidation_prompt(&memory_root);
assert!(prompt.contains("Memory extension resources removed by retention pruning:"));
assert!(prompt.contains("- retention window: 7 days"));
assert!(prompt.contains("- extension: chronicle"));
assert!(prompt.contains(" - resources/2026-04-06T11-59-59-abcd-10min-old.md"));
assert!(prompt.contains(" - resources/2026-04-07T12-00-00-abcd-10min-cutoff.md"));
assert!(prompt.contains("extension-specific deletion diff"));
assert!(prompt.contains("Memory workspace diff:"));
assert!(prompt.contains("phase2_workspace_diff.md"));
assert!(prompt.contains(&format!(
"Memory extensions (under {}/):",
memory_extensions_root.display()
)));
assert!(prompt.contains("workspace diff shows deleted extension resource files"));
}
#[tokio::test]
-18
View File
@@ -38,24 +38,6 @@ pub(super) async fn sync_rollout_summaries_from_memories(
write_rollout_summary_for_thread(root, memory).await?;
}
if retained.is_empty() {
for file_name in ["MEMORY.md", "memory_summary.md"] {
let path = root.join(file_name);
if let Err(err) = tokio::fs::remove_file(path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
return Err(err);
}
}
let skills_dir = root.join("skills");
if let Err(err) = tokio::fs::remove_dir_all(skills_dir).await
&& err.kind() != std::io::ErrorKind::NotFound
{
return Err(err);
}
}
Ok(())
}
+301 -59
View File
@@ -13,6 +13,7 @@ use codex_state::Stage1Output;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::path::Path;
use std::path::PathBuf;
use tempfile::tempdir;
@@ -130,6 +131,56 @@ async fn clear_memory_root_contents_rejects_symlinked_root() {
);
}
struct ConsolidatedOutputPaths {
memory_index: PathBuf,
memory_summary: PathBuf,
skill: PathBuf,
}
async fn write_consolidated_outputs(root: &Path) -> ConsolidatedOutputPaths {
let paths = ConsolidatedOutputPaths {
memory_index: root.join("MEMORY.md"),
memory_summary: root.join("memory_summary.md"),
skill: root.join("skills/demo/SKILL.md"),
};
tokio::fs::write(&paths.memory_index, "consolidated memory index\n")
.await
.expect("write memory index");
tokio::fs::write(&paths.memory_summary, "consolidated memory summary\n")
.await
.expect("write memory summary");
tokio::fs::create_dir_all(paths.skill.parent().expect("skill parent"))
.await
.expect("create skill dir");
tokio::fs::write(&paths.skill, "consolidated skill\n")
.await
.expect("write skill");
paths
}
async fn assert_consolidated_outputs_exist(paths: &ConsolidatedOutputPaths, context: &str) {
assert!(
tokio::fs::try_exists(&paths.memory_index)
.await
.expect("check memory index existence"),
"{context} should leave MEMORY.md untouched"
);
assert!(
tokio::fs::try_exists(&paths.memory_summary)
.await
.expect("check memory summary existence"),
"{context} should leave memory_summary.md untouched"
);
assert!(
tokio::fs::try_exists(&paths.skill)
.await
.expect("check skill existence"),
"{context} should leave skills untouched"
);
}
#[tokio::test]
async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only() {
let dir = tempdir().expect("tempdir");
@@ -236,6 +287,46 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
assert!(rollout_path_pos < file_pos);
}
#[tokio::test]
async fn sync_empty_inputs_preserves_consolidated_outputs() {
let dir = tempdir().expect("tempdir");
let root = dir.path().join("memory");
ensure_layout(&root).await.expect("ensure layout");
let stale_rollout_summary_path = rollout_summaries_dir(&root).join("stale.md");
tokio::fs::write(&stale_rollout_summary_path, "stale summary\n")
.await
.expect("write stale rollout summary");
let outputs = write_consolidated_outputs(&root).await;
sync_rollout_summaries_from_memories(
&root,
&[],
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
)
.await
.expect("sync empty rollout summaries");
rebuild_raw_memories_file_from_memories(
&root,
&[],
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
)
.await
.expect("rebuild empty raw memories");
assert!(
!tokio::fs::try_exists(&stale_rollout_summary_path)
.await
.expect("check stale rollout summary existence"),
"empty sync should prune stale rollout summaries"
);
let raw_memories = tokio::fs::read_to_string(raw_memories_file(&root))
.await
.expect("read raw memories");
assert_eq!(raw_memories, "# Raw Memories\n\nNo raw memories yet.\n");
assert_consolidated_outputs_exist(&outputs, "empty sync").await;
}
#[tokio::test]
async fn sync_rollout_summaries_uses_timestamp_hash_and_sanitized_slug_filename() {
let dir = tempdir().expect("tempdir");
@@ -422,6 +513,9 @@ mod phase2 {
use crate::memories::phase2;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use crate::memories::workspace::prepare_memory_workspace;
use crate::session::session::Session;
use crate::session::tests::make_session_and_context;
use chrono::Duration as ChronoDuration;
@@ -520,7 +614,7 @@ mod phase2 {
}
}
async fn seed_stage1_output(&self, source_updated_at: i64) {
async fn seed_stage1_output(&self, source_updated_at: i64) -> ThreadId {
let thread_id = ThreadId::new();
let mut metadata_builder = ThreadMetadataBuilder::new(
thread_id,
@@ -569,6 +663,7 @@ mod phase2 {
.expect("mark stage-1 success"),
"stage-1 success should enqueue global consolidation"
);
thread_id
}
async fn shutdown_threads(&self) {
@@ -613,16 +708,85 @@ mod phase2 {
}
#[tokio::test]
async fn dispatch_skips_when_global_job_is_not_dirty() {
async fn dispatch_skips_when_memory_workspace_is_not_dirty() {
let harness = DispatchHarness::new().await;
let root = memory_root(&harness.config.codex_home);
rebuild_raw_memories_file_from_memories(
&root,
&[],
/*max_raw_memories_for_consolidation*/ 0,
)
.await
.expect("write empty raw memories baseline");
let outputs = super::write_consolidated_outputs(&root).await;
prepare_memory_workspace(&root)
.await
.expect("commit empty memory workspace as baseline");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
super::assert_consolidated_outputs_exist(&outputs, "clean no-input phase2").await;
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
}
#[tokio::test]
async fn dispatch_uses_git_dirty_state_without_db_dirty_watermark() {
let harness = DispatchHarness::new().await;
let root = memory_root(&harness.config.codex_home);
rebuild_raw_memories_file_from_memories(
&root,
&[],
/*max_raw_memories_for_consolidation*/ 0,
)
.await
.expect("write empty raw memories baseline");
prepare_memory_workspace(&root)
.await
.expect("commit empty memory workspace as baseline");
let extension_resource = root
.join("extensions")
.join("chronicle")
.join("resources")
.join("2026-04-22T12-00-00-abcd-10min-memory.md");
tokio::fs::create_dir_all(
extension_resource
.parent()
.expect("extension resource parent"),
)
.await
.expect("create extension resource dir");
tokio::fs::write(
root.join("extensions/chronicle/instructions.md"),
"instructions\n",
)
.await
.expect("write extension instructions");
tokio::fs::write(&extension_resource, "extension memory\n")
.await
.expect("write extension resource");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 1);
let workspace_diff = tokio::fs::read_to_string(root.join("phase2_workspace_diff.md"))
.await
.expect("read workspace diff");
assert!(
workspace_diff.contains("- A extensions/chronicle/instructions.md"),
"git-only extension instructions should dirty phase2: {workspace_diff}"
);
assert!(
workspace_diff.contains("- A extensions/chronicle/resources/"),
"git-only extension resource should dirty phase2: {workspace_diff}"
);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 1);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_skips_when_global_job_is_already_running() {
let harness = DispatchHarness::new().await;
@@ -696,7 +860,8 @@ mod phase2 {
assert!(
matches!(
post_dispatch_claim,
Phase2JobClaimOutcome::SkippedRunning | Phase2JobClaimOutcome::SkippedNotDirty
Phase2JobClaimOutcome::SkippedRunning
| Phase2JobClaimOutcome::SkippedRetryUnavailable
),
"stale-lock dispatch should either keep the reclaimed job running or finish it before re-claim"
);
@@ -835,7 +1000,7 @@ mod phase2 {
}
#[tokio::test]
async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() {
async fn dispatch_with_empty_stage1_outputs_spawns_for_workspace_changes() {
let harness = DispatchHarness::new().await;
let root = memory_root(&harness.config.codex_home);
let summaries_dir = rollout_summaries_dir(&root);
@@ -851,25 +1016,7 @@ mod phase2 {
tokio::fs::write(&raw_memories_path, "stale raw memories\n")
.await
.expect("write stale raw memories");
let memory_index_path = root.join("MEMORY.md");
tokio::fs::write(&memory_index_path, "stale memory index\n")
.await
.expect("write stale memory index");
let memory_summary_path = root.join("memory_summary.md");
tokio::fs::write(&memory_summary_path, "stale memory summary\n")
.await
.expect("write stale memory summary");
let stale_skill_file = root.join("skills/demo/SKILL.md");
tokio::fs::create_dir_all(
stale_skill_file
.parent()
.expect("skills subdirectory parent should exist"),
)
.await
.expect("create stale skills dir");
tokio::fs::write(&stale_skill_file, "stale skill\n")
.await
.expect("write stale skill");
let outputs = super::write_consolidated_outputs(&root).await;
harness
.state_db
@@ -889,43 +1036,130 @@ mod phase2 {
.await
.expect("read rebuilt raw memories");
pretty_assertions::assert_eq!(raw_memories, "# Raw Memories\n\nNo raw memories yet.\n");
assert!(
!tokio::fs::try_exists(&memory_index_path)
.await
.expect("check memory index existence"),
"empty consolidation should remove stale MEMORY.md"
);
assert!(
!tokio::fs::try_exists(&memory_summary_path)
.await
.expect("check memory summary existence"),
"empty consolidation should remove stale memory_summary.md"
);
assert!(
!tokio::fs::try_exists(&stale_skill_file)
.await
.expect("check stale skill existence"),
"empty consolidation should remove stale skills artifacts"
);
assert!(
!tokio::fs::try_exists(root.join("skills"))
.await
.expect("check skills dir existence"),
"empty consolidation should remove stale skills directory"
);
super::assert_consolidated_outputs_exist(&outputs, "empty consolidation").await;
let next_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), /*lease_seconds*/ 3_600)
.await
.expect("claim global job after empty consolidation success");
pretty_assertions::assert_eq!(next_claim, Phase2JobClaimOutcome::SkippedNotDirty);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
.expect("claim global job after empty consolidation dispatch");
pretty_assertions::assert_eq!(next_claim, Phase2JobClaimOutcome::SkippedRunning);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 1);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
pretty_assertions::assert_eq!(thread_ids.len(), 1);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_empty_selected_inputs_preserves_consolidated_outputs() {
let harness = DispatchHarness::new().await;
let source_updated_at = Utc::now().timestamp();
let thread_id = harness.seed_stage1_output(source_updated_at).await;
let root = memory_root(&harness.config.codex_home);
let selected = harness
.state_db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await
.expect("load phase2 input selection");
sync_rollout_summaries_from_memories(&root, &selected, selected.len())
.await
.expect("sync selected rollout summaries");
rebuild_raw_memories_file_from_memories(&root, &selected, selected.len())
.await
.expect("sync selected raw memories");
let outputs = super::write_consolidated_outputs(&root).await;
prepare_memory_workspace(&root)
.await
.expect("commit current memory workspace as baseline");
let claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), /*lease_seconds*/ 3_600)
.await
.expect("claim global phase2 job");
let Phase2JobClaimOutcome::Claimed {
ownership_token, ..
} = claim
else {
panic!("unexpected phase2 claim outcome: {claim:?}");
};
assert!(
harness
.state_db
.mark_global_phase2_job_succeeded(&ownership_token, source_updated_at, &selected)
.await
.expect("mark phase2 succeeded"),
"phase2 success should update selected baseline"
);
assert!(
harness
.state_db
.mark_thread_memory_mode_polluted(thread_id)
.await
.expect("mark thread polluted"),
"polluted selected thread should enqueue phase2 forgetting"
);
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 1);
super::assert_consolidated_outputs_exist(&outputs, "empty selected phase2").await;
let workspace_diff = tokio::fs::read_to_string(root.join("phase2_workspace_diff.md"))
.await
.expect("read workspace diff");
assert!(
workspace_diff.contains("- D rollout_summaries/"),
"empty selected phase2 should surface deleted rollout summaries: {workspace_diff}"
);
assert!(
!workspace_diff.contains("- D MEMORY.md"),
"empty selected phase2 should not delete MEMORY.md directly: {workspace_diff}"
);
assert!(
!workspace_diff.contains("- D memory_summary.md"),
"empty selected phase2 should not delete memory_summary.md directly: {workspace_diff}"
);
assert!(
!workspace_diff.contains("- D skills/demo/SKILL.md"),
"empty selected phase2 should not delete skills directly: {workspace_diff}"
);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_clean_workspace_preserves_selected_phase2_baseline() {
let harness = DispatchHarness::new().await;
let thread_id = harness.seed_stage1_output(Utc::now().timestamp()).await;
let root = memory_root(&harness.config.codex_home);
let selected = harness
.state_db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await
.expect("load phase2 input selection");
sync_rollout_summaries_from_memories(&root, &selected, selected.len())
.await
.expect("sync selected rollout summaries");
rebuild_raw_memories_file_from_memories(&root, &selected, selected.len())
.await
.expect("sync selected raw memories");
prepare_memory_workspace(&root)
.await
.expect("commit current memory workspace as baseline");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let selected = harness
.state_db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await
.expect("load phase2 input selection after clean workspace success");
pretty_assertions::assert_eq!(selected.len(), 1);
pretty_assertions::assert_eq!(selected[0].thread_id, thread_id);
}
#[tokio::test]
async fn dispatch_marks_job_for_retry_when_sandbox_policy_cannot_be_overridden() {
let harness = DispatchHarness::new().await;
@@ -946,7 +1180,7 @@ mod phase2 {
.try_claim_global_phase2_job(ThreadId::new(), /*lease_seconds*/ 3_600)
.await
.expect("claim global job after sandbox policy failure");
pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty);
pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedRetryUnavailable);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
@@ -968,7 +1202,7 @@ mod phase2 {
.try_claim_global_phase2_job(ThreadId::new(), /*lease_seconds*/ 3_600)
.await
.expect("claim global job after sync failure");
pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty);
pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedRetryUnavailable);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
@@ -990,7 +1224,7 @@ mod phase2 {
.try_claim_global_phase2_job(ThreadId::new(), /*lease_seconds*/ 3_600)
.await
.expect("claim global job after rebuild failure");
pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty);
pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedRetryUnavailable);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
@@ -1067,14 +1301,14 @@ mod phase2 {
let chronicle_resources = config
.codex_home
.join("memories_extensions/chronicle/resources");
.join("memories/extensions/chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources)
.await
.expect("create chronicle resources");
tokio::fs::write(
config
.codex_home
.join("memories_extensions/chronicle/instructions.md"),
.join("memories/extensions/chronicle/instructions.md"),
"instructions",
)
.await
@@ -1095,14 +1329,22 @@ mod phase2 {
.expect("claim global job after spawn failure");
pretty_assertions::assert_eq!(
retry_claim,
Phase2JobClaimOutcome::SkippedNotDirty,
Phase2JobClaimOutcome::SkippedRetryUnavailable,
"spawn failures should leave the job in retry backoff instead of running"
);
assert!(
tokio::fs::try_exists(&old_file)
!tokio::fs::try_exists(&old_file)
.await
.expect("check old extension resource"),
"spawn failures should not prune extension resources before retry"
"old extension resources should still be pruned on failed phase2 attempts"
);
let workspace_diff =
tokio::fs::read_to_string(config.codex_home.join("memories/phase2_workspace_diff.md"))
.await
.expect("read workspace diff");
assert!(
workspace_diff.contains("- D extensions/chronicle/resources/"),
"spawn failures should keep a retryable workspace diff: {workspace_diff}"
);
}
}
+124
View File
@@ -0,0 +1,124 @@
use anyhow::Context;
use codex_git_utils::GitBaselineDiff;
use codex_git_utils::diff_since_latest_init;
use codex_git_utils::ensure_git_baseline_repository;
use codex_git_utils::reset_git_repository;
use std::path::Path;
/// Generated diff file the Phase 2 consolidation agent reads before editing memories.
pub(super) const WORKSPACE_DIFF_FILENAME: &str = "phase2_workspace_diff.md";
const WORKSPACE_DIFF_MAX_BYTES: usize = 4 * 1024 * 1024;
/// Prepares the memory directory for git-baseline diffing.
///
/// This keeps an existing usable `.git/` baseline intact. It initializes a new git baseline when the
/// metadata is missing or unusable, and removes any stale generated `phase2_workspace_diff.md` file
/// so that the next diff does not include a previous prompt artifact.
pub(super) async fn prepare_memory_workspace(root: &Path) -> anyhow::Result<()> {
tokio::fs::create_dir_all(root)
.await
.with_context(|| format!("create memory workspace {}", root.display()))?;
remove_workspace_diff(root).await?;
ensure_git_baseline_repository(root).await?;
Ok(())
}
/// Returns the current workspace diff after removing any stale generated diff artifact.
///
/// The removed file is only `phase2_workspace_diff.md`; memory artifacts and `.git/` metadata are
/// left intact.
pub(super) async fn memory_workspace_diff(root: &Path) -> anyhow::Result<GitBaselineDiff> {
remove_workspace_diff(root).await?;
diff_since_latest_init(root).await
}
/// Writes `phase2_workspace_diff.md` with a bounded git-style diff from the current baseline.
pub(super) async fn write_workspace_diff(
root: &Path,
diff: &GitBaselineDiff,
) -> anyhow::Result<()> {
let path = root.join(WORKSPACE_DIFF_FILENAME);
tokio::fs::write(&path, render_workspace_diff_file(diff))
.await
.with_context(|| format!("write memory workspace diff file {}", path.display()))
}
/// Marks the current memory root as the new baseline.
///
/// The generated diff file is removed before resetting the baseline so deleted memory content is
/// not retained in the prompt artifact or in unreachable git objects.
pub(super) async fn reset_memory_workspace_baseline(root: &Path) -> anyhow::Result<()> {
remove_workspace_diff(root).await?;
reset_git_repository(root).await
}
/// Removes the generated `phase2_workspace_diff.md` prompt artifact.
///
/// This does not remove `.git/`, reset the baseline, or delete memory content. It is used before
/// diffing and before baseline reset so the generated diff file itself is not treated as memory
/// workspace input.
pub(super) async fn remove_workspace_diff(root: &Path) -> anyhow::Result<()> {
let path = root.join(WORKSPACE_DIFF_FILENAME);
match tokio::fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err)
.with_context(|| format!("remove memory workspace diff file {}", path.display())),
}
}
fn render_workspace_diff_file(diff: &GitBaselineDiff) -> String {
let mut rendered = String::from(
"# Memory Workspace Diff\n\n\
Generated by Codex before Phase 2 memory consolidation. Read this file first and do not edit it.\n\n\
## Status\n",
);
if !diff.has_changes() {
rendered.push_str("- none\n");
return rendered;
}
for change in &diff.changes {
rendered.push_str(&format!("- {} {}\n", change.status.label(), change.path));
}
rendered.push_str("\n## Diff\n\n```diff\n");
append_bounded_diff(&mut rendered, &diff.unified_diff);
rendered.push_str("```\n");
rendered
}
fn append_bounded_diff(rendered: &mut String, diff: &str) {
if diff.len() <= WORKSPACE_DIFF_MAX_BYTES {
rendered.push_str(diff);
if !diff.ends_with('\n') {
rendered.push('\n');
}
return;
}
let boundary = previous_char_boundary(diff, WORKSPACE_DIFF_MAX_BYTES);
rendered.push_str(&diff[..boundary]);
if !rendered.ends_with('\n') {
rendered.push('\n');
}
rendered.push_str(&format!(
"\n[workspace diff truncated at {WORKSPACE_DIFF_MAX_BYTES} bytes]\n"
));
}
fn previous_char_boundary(value: &str, max_bytes: usize) -> usize {
if max_bytes >= value.len() {
return value.len();
}
let mut index = max_bytes;
while !value.is_char_boundary(index) {
index -= 1;
}
index
}
#[cfg(test)]
#[path = "workspace_tests.rs"]
mod tests;
@@ -0,0 +1,78 @@
use super::*;
use codex_git_utils::GitBaselineChange;
use codex_git_utils::GitBaselineChangeStatus;
use pretty_assertions::assert_eq;
use std::fs;
use tempfile::TempDir;
#[test]
fn render_workspace_diff_file_bounds_large_diff() {
let diff = GitBaselineDiff {
changes: vec![GitBaselineChange {
status: GitBaselineChangeStatus::Modified,
path: "MEMORY.md".to_string(),
}],
unified_diff: "a".repeat(WORKSPACE_DIFF_MAX_BYTES + 128),
};
let rendered = render_workspace_diff_file(&diff);
assert!(rendered.contains("- M MEMORY.md"));
assert!(rendered.contains("[workspace diff truncated at 4194304 bytes]"));
assert!(rendered.ends_with("```\n"));
}
#[tokio::test]
async fn reset_memory_workspace_baseline_removes_generated_diff() {
let home = TempDir::new().expect("tempdir");
let root = home.path().join("memories");
prepare_memory_workspace(&root)
.await
.expect("prepare memory workspace");
fs::write(root.join("MEMORY.md"), "memory").expect("write memory");
write_workspace_diff(
&root,
&GitBaselineDiff {
changes: vec![GitBaselineChange {
status: GitBaselineChangeStatus::Added,
path: "MEMORY.md".to_string(),
}],
unified_diff: "+memory\n".to_string(),
},
)
.await
.expect("write workspace diff");
reset_memory_workspace_baseline(&root)
.await
.expect("reset baseline");
assert!(!root.join(WORKSPACE_DIFF_FILENAME).exists());
let diff = memory_workspace_diff(&root)
.await
.expect("load workspace diff");
assert_eq!(diff.changes, Vec::new());
}
#[tokio::test]
async fn prepare_memory_workspace_recovers_unusable_git_dir() {
let home = TempDir::new().expect("tempdir");
let root = home.path().join("memories");
fs::create_dir_all(root.join(".git")).expect("create unusable git dir");
fs::write(root.join("MEMORY.md"), "memory").expect("write memory");
prepare_memory_workspace(&root)
.await
.expect("prepare memory workspace");
let diff = memory_workspace_diff(&root)
.await
.expect("load workspace diff");
assert_eq!(diff.changes, Vec::new());
}
#[test]
fn previous_char_boundary_handles_multibyte_text() {
let text = "";
assert_eq!(previous_char_boundary(text, /*max_bytes*/ 2), 1);
}
@@ -143,29 +143,34 @@ Mode selection:
- INCREMENTAL UPDATE: existing artifacts already exist and `raw_memories.md`
mostly contains new additions.
Incremental thread diff snapshot (computed before the current artifact sync rewrites local files):
Memory workspace diff:
**Diff since last consolidation:**
{{ phase2_input_selection }}
The folder `{{ memory_root }}/` is a git repository managed by Codex. Read
`{{ phase2_workspace_diff_file }}` in this same folder first. It contains the git-style diff from
the previous successful Phase 2 baseline to the current worktree. It is generated by Codex for
this run and is not part of the committed memory artifacts.
Incremental update and forgetting mechanism:
- Use the diff provided
- Use the git-style diff in `{{ phase2_workspace_diff_file }}` to identify relevant changed
sections and deleted inputs.
- Every changes in `{{ phase2_workspace_diff_file }}` are authoritative and must propagated and consolidated. If a
changes appears to be randomly placed in the files, it is probably a user change and you shouldn't just drop it.
Make sure to add it to the overall memories consolidation
- Do not open raw sessions / original rollout transcripts.
- For each added thread id, search it in `raw_memories.md`, read that raw-memory section, and
read the corresponding `rollout_summaries/*.md` file only when needed for stronger evidence,
task placement, or conflict resolution.
- For added or modified `raw_memories.md` and `rollout_summaries/*.md` files, read the changed
raw-memory sections and the corresponding rollout summaries only when needed for stronger
evidence, task placement, or conflict resolution.
- When scanning a raw-memory section, read the task-level `Preference signals:` subsections
first, then the rest of the task blocks.
- For each removed thread id, search it in `MEMORY.md` and delete only the memory supported by
that thread. Use `thread_id=<thread_id>` in `### rollout_summary_files` when available; if not,
fall back to rollout summary filenames plus the corresponding `rollout_summaries/*.md` files.
- If a `MEMORY.md` block contains both removed and undeleted threads, do not delete the whole
block. Remove only the removed thread's references and thread-local guidance, preserve shared
or still-supported content, and split or rewrite the block only if needed to keep the undeleted
threads intact.
- For deleted `rollout_summaries/*.md` or `extensions/*/resources/*.md` files, search their
filenames, paths, and thread ids (when present) in `MEMORY.md`. Delete only memory supported
by deleted inputs.
- If a `MEMORY.md` block contains both deleted and still-present evidence, do not delete the whole
block. Remove only stale references and stale local guidance, preserve shared or still-supported
content, and split or rewrite the block only if needed.
- After `MEMORY.md` cleanup is done, revisit `memory_summary.md` and remove or rewrite stale
summary/index content that was only supported by removed thread ids.
summary/index content that was only supported by deleted files.
Outputs:
Under `{{ memory_root }}/`:
@@ -743,26 +748,28 @@ WORKFLOW
3. INCREMENTAL UPDATE behavior:
- Read existing `MEMORY.md` and `memory_summary.md` first for continuity and to locate
existing references that may need surgical cleanup.
- Use the injected thread-diff snapshot as the first routing pass:
- added thread ids = ingestion queue
- removed thread ids = forgetting / stale-cleanup queue
- Use the injected git-style workspace changes as the first routing pass:
- added/modified `raw_memories.md` and `rollout_summaries/*.md` = ingestion queue
- deleted `rollout_summaries/*.md` and `extensions/*/resources/*.md` = forgetting /
stale-cleanup queue
- Build an index of rollout references already present in existing `MEMORY.md` before
scanning raw memories so you can route net-new evidence into the right blocks.
- Work in this order:
1. For newly added thread ids, search them in `raw_memories.md`, read those sections, and
open the corresponding `rollout_summaries/*.md` files when necessary.
1. For added or modified rollout inputs, search their paths/thread ids in `raw_memories.md`,
read those sections, and open the corresponding `rollout_summaries/*.md` files when
necessary.
2. Route the new signal into existing `MEMORY.md` blocks or create new ones when needed.
3. For removed thread ids, search `MEMORY.md` and surgically delete or rewrite only the
unsupported thread-local memory.
4. If a block mixes removed and undeleted threads, preserve the undeleted-thread content;
split or rewrite the block if that is the cleanest way to delete only the removed part.
3. For deleted inputs, search `MEMORY.md` and surgically delete or rewrite only the
unsupported memory.
4. If a block mixes deleted and still-present evidence, preserve the still-supported content;
split or rewrite the block if that is the cleanest way to delete only the stale part.
5. After `MEMORY.md` is correct, revisit `memory_summary.md` and remove or rewrite stale
summary/index content that no longer has undeleted support.
summary/index content that no longer has current support.
- Integrate new signal into existing artifacts by:
- scanning the newly added raw-memory entries in recency order and identifying which existing blocks they should update
- scanning added or modified raw-memory entries in recency order and identifying which existing blocks they should update
- updating existing knowledge with better/newer evidence
- updating stale or contradicting guidance
- pruning or downgrading memory whose only provenance comes from removed thread ids
- pruning or downgrading memory whose only provenance comes from deleted inputs
- expanding terse old blocks when new summaries/raw memories make the task family clearer
- doing light clustering and merging if needed
- refreshing `MEMORY.md` top-of-file ordering so recent high-utility task families stay easy to find
@@ -774,8 +781,8 @@ WORKFLOW
target, keep its wording, label, and relative order mostly stable. Rewrite/reorder/rename/
split/merge only when fixing a real problem (staleness, ambiguity, schema drift, wrong
boundaries) or when meaningful new evidence materially improves retrieval clarity/searchability.
- Spend most of your deep-dive budget on newly added thread ids and on mixed blocks touched by
removed thread ids. Do not re-read unchanged older threads unless you need them for
- Spend most of your deep-dive budget on added/modified inputs and on mixed blocks touched by
deleted inputs. Do not re-read unchanged older threads unless you need them for
conflict resolution, clustering, or provenance repair.
4. Evidence deep-dive rule (both modes):
@@ -793,8 +800,7 @@ WORKFLOW
evidence, procedural detail, validation signals, and user feedback before finalizing
`MEMORY.md`.
- When deleting stale memory from a mixed block, use the relevant rollout summaries to decide
which details are uniquely supported by removed threads versus still supported by undeleted
threads.
which details are uniquely supported by deleted inputs versus still-supported evidence.
- Use `updated_at` and validation strength together to resolve stale/conflicting notes.
- For user-profile or preference claims, recurrence matters: repeated evidence across
rollouts should generally outrank a single polished but isolated summary.
@@ -811,7 +817,7 @@ WORKFLOW
- remove duplication in memory_summary, skills/, and MEMORY.md
- remove stale or low-signal blocks that are less likely to be useful in the future
- remove or rewrite blocks/task sections whose supporting rollout references point only to
removed thread ids or missing rollout summary files
deleted inputs or missing rollout summary files
- run a global rollout-reference audit on final `MEMORY.md` and fix accidental duplicate
entries / redundant repetition, while preserving intentional multi-task or multi-block
reuse when it adds distinct task-local value
+52 -292
View File
@@ -2,6 +2,8 @@ use anyhow::Result;
use chrono::Duration as ChronoDuration;
use chrono::Utc;
use codex_features::Feature;
use codex_git_utils::diff_since_latest_init;
use codex_git_utils::reset_git_repository;
use codex_protocol::ThreadId;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
@@ -11,9 +13,7 @@ use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::ev_web_search_call_done;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::TestCodex;
@@ -27,13 +27,14 @@ use tokio::time::Duration;
use tokio::time::Instant;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -> Result<()> {
async fn memories_startup_phase2_tracks_workspace_diff_across_runs() -> Result<()> {
let server = start_mock_server().await;
let home = Arc::new(TempDir::new()?);
let db = init_state_db(&home).await?;
let memory_root = home.path().join("memories");
let now = Utc::now();
let thread_a = seed_stage1_output(
let _thread_a = seed_stage1_output(
db.as_ref(),
home.path(),
now - ChronoDuration::hours(2),
@@ -43,53 +44,21 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
)
.await?;
let first_phase2 = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-phase2-1"),
ev_assistant_message("msg-phase2-1", "phase2 complete"),
ev_completed("resp-phase2-1"),
]),
let rollout_summaries_root = memory_root.join("rollout_summaries");
tokio::fs::create_dir_all(&rollout_summaries_root).await?;
tokio::fs::write(
memory_root.join("raw_memories.md"),
"# Raw Memories\n\nraw memory A\n",
)
.await;
.await?;
tokio::fs::write(
rollout_summaries_root.join("rollout-a.md"),
"git_branch: branch-rollout-a\n\nrollout summary A\n",
)
.await?;
reset_git_repository(&memory_root).await?;
let first = build_test_codex(&server, home.clone()).await?;
let first_request = wait_for_single_request(&first_phase2).await;
let first_prompt = phase2_prompt_text(&first_request);
assert!(
first_prompt.contains("- selected inputs this run: 1"),
"expected selected count in first prompt: {first_prompt}"
);
assert!(
first_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
"expected added count in first prompt: {first_prompt}"
);
assert!(
first_prompt.contains("- removed from the last successful Phase 2 run: 0"),
"expected removed count in first prompt: {first_prompt}"
);
assert!(
first_prompt.contains(&format!("- [added] thread_id={thread_a},")),
"expected thread A to be marked added: {first_prompt}"
);
assert!(
first_prompt.contains("Removed from the last successful Phase 2 selection:\n- none"),
"expected no removed items in first prompt: {first_prompt}"
);
wait_for_phase2_success(db.as_ref(), thread_a).await?;
let memory_root = home.path().join("memories");
let raw_memories = tokio::fs::read_to_string(memory_root.join("raw_memories.md")).await?;
assert!(raw_memories.contains("raw memory A"));
assert!(!raw_memories.contains("raw memory B"));
let rollout_summaries = read_rollout_summary_bodies(&memory_root).await?;
assert_eq!(rollout_summaries.len(), 1);
assert!(rollout_summaries[0].contains("rollout summary A"));
assert!(rollout_summaries[0].contains("git_branch: branch-rollout-a"));
shutdown_test_codex(&first).await?;
let thread_b = seed_stage1_output(
let _thread_b = seed_stage1_output(
db.as_ref(),
home.path(),
now - ChronoDuration::hours(1),
@@ -99,46 +68,30 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
)
.await?;
let second_phase2 = mount_sse_once(
let phase2 = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-phase2-2"),
ev_assistant_message("msg-phase2-2", "phase2 complete"),
ev_completed("resp-phase2-2"),
ev_response_created("resp-phase2"),
ev_assistant_message("msg-phase2", "phase2 complete"),
ev_completed("resp-phase2"),
]),
)
.await;
let second = build_test_codex(&server, home.clone()).await?;
let second_request = wait_for_single_request(&second_phase2).await;
let second_prompt = phase2_prompt_text(&second_request);
let codex = build_test_codex(&server, home.clone()).await?;
let request = wait_for_single_request(&phase2).await;
let prompt = phase2_prompt_text(&request);
assert!(
second_prompt.contains("- selected inputs this run: 1"),
"expected selected count in second prompt: {second_prompt}"
);
assert!(
second_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
"expected added count in second prompt: {second_prompt}"
);
assert!(
second_prompt.contains("- removed from the last successful Phase 2 run: 1"),
"expected removed count in second prompt: {second_prompt}"
);
assert!(
second_prompt.contains(&format!("- [added] thread_id={thread_b},")),
"expected thread B to be marked added: {second_prompt}"
);
assert!(
second_prompt.contains(&format!("- thread_id={thread_a},")),
"expected thread A to be marked removed: {second_prompt}"
prompt.contains("phase2_workspace_diff.md"),
"expected workspace diff file in prompt: {prompt}"
);
wait_for_phase2_success(db.as_ref(), thread_b).await?;
wait_for_phase2_workspace_reset(&memory_root).await?;
let raw_memories = tokio::fs::read_to_string(memory_root.join("raw_memories.md")).await?;
assert!(raw_memories.contains("raw memory B"));
assert!(raw_memories.contains("raw memory A"));
assert!(!raw_memories.contains("raw memory A"));
let rollout_summaries = read_rollout_summary_bodies(&memory_root).await?;
assert_eq!(rollout_summaries.len(), 2);
assert_eq!(rollout_summaries.len(), 1);
assert!(
rollout_summaries
.iter()
@@ -152,20 +105,20 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
assert!(
rollout_summaries
.iter()
.any(|summary| summary.contains("rollout summary A"))
.all(|summary| !summary.contains("rollout summary A"))
);
shutdown_test_codex(&second).await?;
shutdown_test_codex(&codex).await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn memories_startup_phase2_prunes_old_extension_resources_and_reports_them() -> Result<()> {
async fn memories_startup_phase2_prunes_old_extension_resources() -> Result<()> {
let server = start_mock_server().await;
let home = Arc::new(TempDir::new()?);
let db = init_state_db(&home).await?;
let now = Utc::now();
let thread_id = seed_stage1_output(
let _thread_id = seed_stage1_output(
db.as_ref(),
home.path(),
now - ChronoDuration::hours(1),
@@ -175,11 +128,11 @@ async fn memories_startup_phase2_prunes_old_extension_resources_and_reports_them
)
.await?;
let chronicle_resources = home.path().join("memories_extensions/chronicle/resources");
let chronicle_resources = home.path().join("memories/extensions/chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources).await?;
tokio::fs::write(
home.path()
.join("memories_extensions/chronicle/instructions.md"),
.join("memories/extensions/chronicle/instructions.md"),
"instructions",
)
.await?;
@@ -210,23 +163,11 @@ async fn memories_startup_phase2_prunes_old_extension_resources_and_reports_them
let prompt = phase2_prompt_text(&request);
assert!(
prompt.contains("Memory extension resources removed by retention pruning:"),
"expected extension resource prune report in prompt: {prompt}"
);
assert!(
prompt.contains("- retention window: 7 days"),
"expected retention window in prompt: {prompt}"
);
assert!(
prompt.contains("- extension: chronicle"),
"expected extension name in prompt: {prompt}"
);
assert!(
prompt.contains(&format!(" - resources/{old_file_name}")),
"expected old resource in prompt: {prompt}"
prompt.contains("phase2_workspace_diff.md"),
"expected workspace diff file in prompt: {prompt}"
);
wait_for_phase2_success(db.as_ref(), thread_id).await?;
wait_for_phase2_workspace_reset(&home.path().join("memories")).await?;
wait_for_file_removed(&old_file).await?;
assert!(
!tokio::fs::try_exists(&old_file).await?,
@@ -242,8 +183,8 @@ async fn memories_startup_phase2_prunes_old_extension_resources_and_reports_them
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn memories_startup_phase2_processes_old_extension_resources_without_stage1_input()
-> Result<()> {
async fn memories_startup_phase2_prunes_old_extension_resources_without_stage1_input() -> Result<()>
{
let server = start_mock_server().await;
let home = Arc::new(TempDir::new()?);
let db = init_state_db(&home).await?;
@@ -251,11 +192,11 @@ async fn memories_startup_phase2_processes_old_extension_resources_without_stage
.await?;
let now = Utc::now();
let chronicle_resources = home.path().join("memories_extensions/chronicle/resources");
let chronicle_resources = home.path().join("memories/extensions/chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources).await?;
tokio::fs::write(
home.path()
.join("memories_extensions/chronicle/instructions.md"),
.join("memories/extensions/chronicle/instructions.md"),
"instructions",
)
.await?;
@@ -281,189 +222,16 @@ async fn memories_startup_phase2_processes_old_extension_resources_without_stage
let prompt = phase2_prompt_text(&request);
assert!(
prompt.contains("- selected inputs this run: 0"),
"expected no selected raw inputs in prompt: {prompt}"
);
assert!(
prompt.contains("Memory extension resources removed by retention pruning:"),
"expected extension resource prune report in prompt: {prompt}"
);
assert!(
prompt.contains(&format!(" - resources/{old_file_name}")),
"expected old resource in prompt: {prompt}"
prompt.contains("phase2_workspace_diff.md"),
"expected workspace diff file in prompt: {prompt}"
);
wait_for_file_removed(&old_file).await?;
wait_for_phase2_workspace_reset(&home.path().join("memories")).await?;
shutdown_test_codex(&codex).await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn web_search_pollution_moves_selected_thread_into_removed_phase2_inputs() -> Result<()> {
let server = start_mock_server().await;
let home = Arc::new(TempDir::new()?);
let db = init_state_db(&home).await?;
let mut initial_builder = test_codex().with_home(home.clone()).with_config(|config| {
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
config
.features
.enable(Feature::MemoryTool)
.expect("test config should allow feature update");
config.memories.max_raw_memories_for_consolidation = 1;
config.memories.disable_on_external_context = true;
});
let initial = initial_builder.build(&server).await?;
mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-initial-1"),
ev_assistant_message("msg-initial-1", "initial turn complete"),
ev_completed("resp-initial-1"),
]),
)
.await;
initial.submit_turn("hello before memories").await?;
let rollout_path = initial
.session_configured
.rollout_path
.clone()
.expect("rollout path");
let thread_id = initial.session_configured.session_id;
let updated_at = {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
if let Some(metadata) = db.get_thread(thread_id).await? {
break metadata.updated_at;
}
assert!(
Instant::now() < deadline,
"timed out waiting for thread metadata for {thread_id}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
};
seed_stage1_output_for_existing_thread(
db.as_ref(),
thread_id,
updated_at.timestamp(),
"raw memory seeded for web search pollution",
"rollout summary seeded for web search pollution",
Some("pollution-rollout"),
)
.await?;
shutdown_test_codex(&initial).await?;
let responses = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-phase2-1"),
ev_assistant_message("msg-phase2-1", "phase2 complete"),
ev_completed("resp-phase2-1"),
]),
sse(vec![
ev_response_created("resp-web-1"),
ev_web_search_call_done("ws-1", "completed", "weather seattle"),
ev_completed("resp-web-1"),
]),
],
)
.await;
let mut resumed_builder = test_codex().with_home(home.clone()).with_config(|config| {
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
config
.features
.enable(Feature::MemoryTool)
.expect("test config should allow feature update");
config.memories.max_raw_memories_for_consolidation = 1;
config.memories.disable_on_external_context = true;
});
let resumed = resumed_builder
.resume(&server, home.clone(), rollout_path.clone())
.await?;
let first_phase2_request = wait_for_request(&responses, /*expected_count*/ 1)
.await
.remove(0);
let first_phase2_prompt = phase2_prompt_text(&first_phase2_request);
assert!(
first_phase2_prompt.contains("- selected inputs this run: 1"),
"expected seeded thread to be selected before pollution: {first_phase2_prompt}"
);
assert!(
first_phase2_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
"expected seeded thread to be added before pollution: {first_phase2_prompt}"
);
assert!(
first_phase2_prompt.contains(&format!("- [added] thread_id={thread_id},")),
"expected selected thread in first phase2 prompt: {first_phase2_prompt}"
);
wait_for_phase2_success(db.as_ref(), thread_id).await?;
resumed
.submit_turn("search the web for weather seattle")
.await?;
assert_eq!(
{
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let memory_mode = db.get_thread_memory_mode(thread_id).await?;
if memory_mode.as_deref() == Some("polluted") {
break memory_mode;
}
assert!(
Instant::now() < deadline,
"timed out waiting for polluted memory mode for {thread_id}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
.as_deref(),
Some("polluted")
);
let selection = {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let selection = db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await?;
if selection.selected.is_empty()
&& selection.retained_thread_ids.is_empty()
&& selection.removed.len() == 1
&& selection.removed[0].thread_id == thread_id
{
break selection;
}
assert!(
Instant::now() < deadline,
"timed out waiting for polluted thread to move into removed phase2 inputs: \
{selection:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
};
assert_eq!(responses.requests().len(), 2);
assert!(selection.selected.is_empty());
assert_eq!(selection.retained_thread_ids, Vec::<ThreadId>::new());
assert_eq!(selection.removed.len(), 1);
assert_eq!(selection.removed[0].thread_id, thread_id);
shutdown_test_codex(&resumed).await?;
Ok(())
}
async fn build_test_codex(server: &wiremock::MockServer, home: Arc<TempDir>) -> Result<TestCodex> {
#[allow(clippy::expect_used)]
let mut builder = test_codex().with_home(home).with_config(|config| {
@@ -560,30 +328,22 @@ fn phase2_prompt_text(request: &ResponsesRequest) -> String {
request
.message_input_texts("user")
.into_iter()
.find(|text| text.contains("Current selected Phase 1 inputs:"))
.find(|text| text.contains("Memory workspace diff:"))
.expect("phase2 prompt text")
}
async fn wait_for_phase2_success(
db: &codex_state::StateRuntime,
expected_thread_id: ThreadId,
) -> Result<()> {
async fn wait_for_phase2_workspace_reset(memory_root: &Path) -> Result<()> {
wait_for_file_removed(&memory_root.join("phase2_workspace_diff.md")).await?;
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let selection = db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await?;
if selection.selected.len() == 1
&& selection.selected[0].thread_id == expected_thread_id
&& selection.retained_thread_ids == vec![expected_thread_id]
&& selection.removed.is_empty()
if let Ok(diff) = diff_since_latest_init(memory_root).await
&& !diff.has_changes()
{
return Ok(());
}
assert!(
Instant::now() < deadline,
"timed out waiting for phase2 success for {expected_thread_id}"
"timed out waiting for clean memory workspace baseline"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
+5 -3
View File
@@ -3,9 +3,11 @@
Helpers for interacting with git, including patch application and worktree
snapshot utilities. The crate also exposes a lightweight baseline API for
internal directories that use git only as a resettable diff mechanism:
`reset_git_repository` replaces `root/.git` with a fresh one-commit baseline,
and `diff_since_latest_init` returns structured file changes plus a unified
diff from that baseline to the current directory contents.
`ensure_git_baseline_repository` preserves a usable `root/.git` baseline or
creates one when it is missing or unusable, `reset_git_repository` replaces
`root/.git` with a fresh one-commit baseline, and `diff_since_latest_init`
returns structured file changes plus a unified diff from that baseline to the
current directory contents.
```rust,no_run
use std::path::Path;
+70 -4
View File
@@ -12,6 +12,8 @@ use std::path::Path;
use std::path::PathBuf;
use tokio::task;
use crate::operations::run_git_for_status;
const BASELINE_COMMIT_MESSAGE: &str =
"Initialize Codex git baseline\n\nCo-authored-by: Codex <noreply@openai.com>";
@@ -65,18 +67,40 @@ struct GitBaselineFileEntry {
/// This is intentionally destructive for `root/.git`. It is meant for internal directories where
/// git is used only as a baseline/diff implementation detail, not for user repositories.
pub async fn reset_git_repository(root: &Path) -> anyhow::Result<()> {
let root = root.to_path_buf();
task::spawn_blocking(move || reset_git_repository_sync(&root)).await?
}
/// Ensures `root` has a usable git baseline repository.
///
/// Existing usable `.git/` metadata is preserved. Missing or unusable metadata is replaced with a
/// fresh one-commit baseline.
pub async fn ensure_git_baseline_repository(root: &Path) -> anyhow::Result<()> {
let root = root.to_path_buf();
task::spawn_blocking(move || {
fs::create_dir_all(&root)
.with_context(|| format!("create git baseline root {}", root.display()))?;
remove_git_metadata(&root)?;
let repo = gix::init(&root).with_context(|| format!("init git repo {}", root.display()))?;
commit_current_tree(&repo, BASELINE_COMMIT_MESSAGE)?;
anyhow::Ok(())
if root.join(".git").is_dir()
&& let Ok(repo) = gix::open(&root)
&& head_file_entries(&repo).is_ok()
{
return Ok(());
}
reset_git_repository_sync(&root)
})
.await?
}
fn reset_git_repository_sync(root: &Path) -> anyhow::Result<()> {
fs::create_dir_all(root)
.with_context(|| format!("create git baseline root {}", root.display()))?;
remove_git_metadata(root)?;
let repo = gix::init(root).with_context(|| format!("init git repo {}", root.display()))?;
commit_current_tree(&repo, BASELINE_COMMIT_MESSAGE)?;
write_index_from_head(root)?;
Ok(())
}
/// Returns the diff between the latest baseline reset and the current directory contents.
pub async fn diff_since_latest_init(root: &Path) -> anyhow::Result<GitBaselineDiff> {
let root = root.to_path_buf();
@@ -130,6 +154,11 @@ fn commit_current_tree(repo: &gix::Repository, message: &str) -> anyhow::Result<
Ok(())
}
fn write_index_from_head(root: &Path) -> anyhow::Result<()> {
run_git_for_status(root, ["read-tree", "--reset", "HEAD"], /*env*/ None)
.context("write git baseline index from HEAD")
}
fn codex_signature() -> gix::actor::Signature {
gix::actor::Signature {
name: "Codex".into(),
@@ -501,8 +530,24 @@ mod tests {
use super::*;
use pretty_assertions::assert_eq;
use std::fs;
use std::process::Command;
use tempfile::TempDir;
fn git_stdout(root: &Path, args: &[&str]) -> String {
let output = Command::new("git")
.current_dir(root)
.args(args)
.output()
.expect("run git command");
assert!(
output.status.success(),
"git command failed: {args:?}\nstdout:\n{}\nstderr:\n{}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);
String::from_utf8_lossy(&output.stdout).to_string()
}
#[tokio::test]
async fn reset_creates_fresh_baseline() {
let home = TempDir::new().expect("tempdir");
@@ -513,9 +558,30 @@ mod tests {
reset_git_repository(&root).await.expect("reset repo");
assert!(root.join(".git").is_dir());
assert!(root.join(".git/index").is_file());
let diff = diff_since_latest_init(&root).await.expect("diff");
assert!(!diff.has_changes());
assert_eq!(diff.unified_diff, "");
assert_eq!(git_stdout(&root, &["status", "--porcelain"]), "");
assert_eq!(git_stdout(&root, &["ls-files"]), "MEMORY.md\n");
}
#[tokio::test]
async fn ensure_recovers_from_unborn_repository() {
let home = TempDir::new().expect("tempdir");
let root = home.path().join("repo");
fs::create_dir_all(&root).expect("create root");
fs::write(root.join("MEMORY.md"), "memory").expect("write memory");
gix::init(&root).expect("init git repo without baseline commit");
ensure_git_baseline_repository(&root)
.await
.expect("ensure repo");
let diff = diff_since_latest_init(&root).await.expect("diff");
assert!(!diff.has_changes());
assert_eq!(git_stdout(&root, &["status", "--porcelain"]), "");
assert_eq!(git_stdout(&root, &["ls-files"]), "MEMORY.md\n");
}
#[tokio::test]
+1
View File
@@ -17,6 +17,7 @@ pub use baseline::GitBaselineChange;
pub use baseline::GitBaselineChangeStatus;
pub use baseline::GitBaselineDiff;
pub use baseline::diff_since_latest_init;
pub use baseline::ensure_git_baseline_repository;
pub use baseline::reset_git_repository;
pub use branch::merge_base_with_head;
pub use codex_protocol::models::GhostCommit;
-2
View File
@@ -14,7 +14,6 @@ mod runtime;
pub use model::LogEntry;
pub use model::LogQuery;
pub use model::LogRow;
pub use model::Phase2InputSelection;
pub use model::Phase2JobClaimOutcome;
/// Preferred entrypoint: owns configuration and metrics.
pub use runtime::StateRuntime;
@@ -42,7 +41,6 @@ pub use model::SortKey;
pub use model::Stage1JobClaim;
pub use model::Stage1JobClaimOutcome;
pub use model::Stage1Output;
pub use model::Stage1OutputRef;
pub use model::Stage1StartupClaimParams;
pub use model::ThreadGoal;
pub use model::ThreadGoalStatus;
+3 -30
View File
@@ -22,21 +22,6 @@ pub struct Stage1Output {
pub generated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Stage1OutputRef {
pub thread_id: ThreadId,
pub source_updated_at: DateTime<Utc>,
pub rollout_slug: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct Phase2InputSelection {
pub selected: Vec<Stage1Output>,
pub previous_selected: Vec<Stage1Output>,
pub retained_thread_ids: Vec<ThreadId>,
pub removed: Vec<Stage1OutputRef>,
}
#[derive(Debug)]
pub(crate) struct Stage1OutputRow {
thread_id: String,
@@ -89,18 +74,6 @@ fn epoch_seconds_to_datetime(secs: i64) -> Result<DateTime<Utc>> {
.ok_or_else(|| anyhow::anyhow!("invalid unix timestamp: {secs}"))
}
pub(crate) fn stage1_output_ref_from_parts(
thread_id: String,
source_updated_at: i64,
rollout_slug: Option<String>,
) -> Result<Stage1OutputRef> {
Ok(Stage1OutputRef {
thread_id: ThreadId::try_from(thread_id)?,
source_updated_at: epoch_seconds_to_datetime(source_updated_at)?,
rollout_slug,
})
}
/// Result of trying to claim a stage-1 memory extraction job.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Stage1JobClaimOutcome {
@@ -136,14 +109,14 @@ pub struct Stage1StartupClaimParams<'a> {
/// Result of trying to claim a phase-2 consolidation job.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Phase2JobClaimOutcome {
/// The caller owns the global lock and should spawn consolidation.
/// The caller owns the global lock and may inspect the memory workspace.
Claimed {
ownership_token: String,
/// Snapshot of `input_watermark` at claim time.
input_watermark: i64,
},
/// The global job is not pending consolidation (or is already up to date).
SkippedNotDirty,
/// The global job is in retry backoff or has exhausted its retry budget.
SkippedRetryUnavailable,
/// Another worker currently owns a fresh global consolidation lease.
SkippedRunning,
}
-3
View File
@@ -19,12 +19,10 @@ pub use graph::DirectionalThreadSpawnEdgeStatus;
pub use log::LogEntry;
pub use log::LogQuery;
pub use log::LogRow;
pub use memories::Phase2InputSelection;
pub use memories::Phase2JobClaimOutcome;
pub use memories::Stage1JobClaim;
pub use memories::Stage1JobClaimOutcome;
pub use memories::Stage1Output;
pub use memories::Stage1OutputRef;
pub use memories::Stage1StartupClaimParams;
pub use thread_goal::ThreadGoal;
pub use thread_goal::ThreadGoalStatus;
@@ -40,7 +38,6 @@ pub use thread_metadata::ThreadsPage;
pub(crate) use agent_job::AgentJobItemRow;
pub(crate) use agent_job::AgentJobRow;
pub(crate) use memories::Stage1OutputRow;
pub(crate) use memories::stage1_output_ref_from_parts;
pub(crate) use thread_goal::ThreadGoalRow;
pub(crate) use thread_metadata::ThreadRow;
pub(crate) use thread_metadata::anchor_from_item;
+145 -196
View File
@@ -3,7 +3,6 @@ use super::threads::push_thread_filters;
use super::threads::push_thread_order_and_limit;
use super::*;
use crate::SortDirection;
use crate::model::Phase2InputSelection;
use crate::model::Phase2JobClaimOutcome;
use crate::model::Stage1JobClaim;
use crate::model::Stage1JobClaimOutcome;
@@ -11,12 +10,10 @@ use crate::model::Stage1Output;
use crate::model::Stage1OutputRow;
use crate::model::Stage1StartupClaimParams;
use crate::model::ThreadRow;
use crate::model::stage1_output_ref_from_parts;
use chrono::Duration;
use sqlx::Executor;
use sqlx::QueryBuilder;
use sqlx::Sqlite;
use std::collections::HashSet;
use uuid::Uuid;
const JOB_KIND_MEMORY_STAGE1: &str = "memory_stage1";
@@ -331,8 +328,7 @@ WHERE thread_id IN (
Ok(rows_affected as usize)
}
/// Returns the current phase-2 input set along with its diff against the
/// last successful phase-2 selection.
/// Returns the current phase-2 input set.
///
/// Query behavior:
/// - current selection keeps only non-empty stage-1 outputs whose
@@ -342,22 +338,17 @@ WHERE thread_id IN (
/// - eligible rows are ordered by `usage_count DESC`,
/// `COALESCE(last_usage, source_updated_at) DESC`, `source_updated_at DESC`,
/// `thread_id DESC`
/// - previously selected rows are identified by `selected_for_phase2 = 1`
/// - `previous_selected` contains the current persisted rows that belonged
/// to the last successful phase-2 baseline, even if those threads are no
/// longer memory-eligible
/// - `retained_thread_ids` records which current rows still match the exact
/// snapshot selected in the last successful phase-2 run
/// - removed rows are previously selected rows that are still present in
/// `stage1_outputs` but are no longer in the current selection, including
/// threads that are no longer memory-eligible
///
/// The returned rows are the complete Phase 2 filesystem input. Phase 2
/// syncs these rows directly; deletions are represented by the workspace
/// diff against the previous successful memory baseline.
pub async fn get_phase2_input_selection(
&self,
n: usize,
max_unused_days: i64,
) -> anyhow::Result<Phase2InputSelection> {
) -> anyhow::Result<Vec<Stage1Output>> {
if n == 0 {
return Ok(Phase2InputSelection::default());
return Ok(Vec::new());
}
let cutoff = (Utc::now() - Duration::days(max_unused_days.max(0))).timestamp();
@@ -372,9 +363,7 @@ SELECT
so.rollout_slug,
so.generated_at,
COALESCE(t.cwd, '') AS cwd,
t.git_branch AS git_branch,
so.selected_for_phase2,
so.selected_for_phase2_source_updated_at
t.git_branch AS git_branch
FROM stage1_outputs AS so
LEFT JOIN threads AS t
ON t.id = so.thread_id
@@ -398,70 +387,14 @@ LIMIT ?
.fetch_all(self.pool.as_ref())
.await?;
let mut current_thread_ids = HashSet::with_capacity(current_rows.len());
let mut selected = Vec::with_capacity(current_rows.len());
let mut retained_thread_ids = Vec::new();
for row in current_rows {
let thread_id = row.try_get::<String, _>("thread_id")?;
current_thread_ids.insert(thread_id.clone());
let source_updated_at = row.try_get::<i64, _>("source_updated_at")?;
if row.try_get::<i64, _>("selected_for_phase2")? != 0
&& row.try_get::<Option<i64>, _>("selected_for_phase2_source_updated_at")?
== Some(source_updated_at)
{
retained_thread_ids.push(ThreadId::try_from(thread_id.clone())?);
}
selected.push(Stage1Output::try_from(Stage1OutputRow::try_from_row(
&row,
)?)?);
}
let previous_rows = sqlx::query(
r#"
SELECT
so.thread_id,
COALESCE(t.rollout_path, '') AS rollout_path,
so.source_updated_at,
so.raw_memory,
so.rollout_summary,
so.rollout_slug,
so.generated_at,
COALESCE(t.cwd, '') AS cwd,
t.git_branch AS git_branch
FROM stage1_outputs AS so
LEFT JOIN threads AS t
ON t.id = so.thread_id
WHERE so.selected_for_phase2 = 1
ORDER BY so.source_updated_at DESC, so.thread_id DESC
"#,
)
.fetch_all(self.pool.as_ref())
.await?;
let previous_selected = previous_rows
.iter()
.map(Stage1OutputRow::try_from_row)
.map(|row| row.and_then(Stage1Output::try_from))
.collect::<Result<Vec<_>, _>>()?;
let mut removed = Vec::new();
for row in previous_rows {
let thread_id = row.try_get::<String, _>("thread_id")?;
if current_thread_ids.contains(thread_id.as_str()) {
continue;
}
removed.push(stage1_output_ref_from_parts(
thread_id,
row.try_get("source_updated_at")?,
row.try_get("rollout_slug")?,
)?);
}
Ok(Phase2InputSelection {
selected,
previous_selected,
retained_thread_ids,
removed,
})
Ok(selected)
}
/// Marks a thread as polluted and enqueues phase-2 forgetting when the
@@ -909,19 +842,22 @@ WHERE kind = ? AND job_key = ?
/// Enqueues or advances the global phase-2 consolidation job watermark.
///
/// The underlying upsert keeps the job `running` when already running, resets
/// `pending/error` jobs to `pending`, and advances `input_watermark` so each
/// enqueue marks new consolidation work even when `source_updated_at` is
/// older than prior maxima.
/// `pending/error` jobs to `pending`, and advances `input_watermark` as
/// bookkeeping even when `source_updated_at` is older than prior maxima.
/// Phase 2 does not use this watermark as a dirty check; git workspace diffing
/// decides whether consolidation work exists after the lock is claimed.
pub async fn enqueue_global_consolidation(&self, input_watermark: i64) -> anyhow::Result<()> {
enqueue_global_consolidation_with_executor(self.pool.as_ref(), input_watermark).await
}
/// Attempts to claim the global phase-2 consolidation job.
/// Attempts to claim the global phase-2 consolidation lock.
///
/// Claim semantics:
/// - reads the singleton global job row (`kind='memory_consolidate_global'`)
/// - returns `SkippedNotDirty` when `input_watermark <= last_success_watermark`
/// - returns `SkippedNotDirty` when retries are exhausted or retry backoff is active
/// - creates and claims the singleton row when it does not exist yet
/// - does not use DB watermarks to decide whether Phase 2 has work; git workspace
/// dirtiness is the source of truth after the caller materializes inputs
/// - returns `SkippedRetryUnavailable` when retries are exhausted or retry backoff is active
/// - returns `SkippedRunning` when an active running lease exists
/// - otherwise updates the row to `running`, sets ownership + lease, and
/// returns `Claimed`
@@ -939,7 +875,7 @@ WHERE kind = ? AND job_key = ?
let existing_job = sqlx::query(
r#"
SELECT status, lease_until, retry_at, retry_remaining, input_watermark, last_success_watermark
SELECT status, lease_until, retry_at, retry_remaining, input_watermark
FROM jobs
WHERE kind = ? AND job_key = ?
"#,
@@ -950,18 +886,49 @@ WHERE kind = ? AND job_key = ?
.await?;
let Some(existing_job) = existing_job else {
let rows_affected = sqlx::query(
r#"
INSERT INTO jobs (
kind,
job_key,
status,
worker_id,
ownership_token,
started_at,
finished_at,
lease_until,
retry_at,
retry_remaining,
last_error,
input_watermark,
last_success_watermark
) VALUES (?, ?, 'running', ?, ?, ?, NULL, ?, NULL, ?, NULL, 0, 0)
"#,
)
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)
.bind(MEMORY_CONSOLIDATION_JOB_KEY)
.bind(worker_id.as_str())
.bind(ownership_token.as_str())
.bind(now)
.bind(lease_until)
.bind(DEFAULT_RETRY_REMAINING)
.execute(&mut *tx)
.await?
.rows_affected();
tx.commit().await?;
return Ok(Phase2JobClaimOutcome::SkippedNotDirty);
return if rows_affected == 0 {
Ok(Phase2JobClaimOutcome::SkippedRunning)
} else {
Ok(Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark: 0,
})
};
};
let input_watermark: Option<i64> = existing_job.try_get("input_watermark")?;
let input_watermark_value = input_watermark.unwrap_or(0);
let last_success_watermark: Option<i64> = existing_job.try_get("last_success_watermark")?;
if input_watermark_value <= last_success_watermark.unwrap_or(0) {
tx.commit().await?;
return Ok(Phase2JobClaimOutcome::SkippedNotDirty);
}
let status: String = existing_job.try_get("status")?;
let existing_lease_until: Option<i64> = existing_job.try_get("lease_until")?;
let retry_at: Option<i64> = existing_job.try_get("retry_at")?;
@@ -969,11 +936,11 @@ WHERE kind = ? AND job_key = ?
if retry_remaining <= 0 {
tx.commit().await?;
return Ok(Phase2JobClaimOutcome::SkippedNotDirty);
return Ok(Phase2JobClaimOutcome::SkippedRetryUnavailable);
}
if retry_at.is_some_and(|retry_at| retry_at > now) {
tx.commit().await?;
return Ok(Phase2JobClaimOutcome::SkippedNotDirty);
return Ok(Phase2JobClaimOutcome::SkippedRetryUnavailable);
}
if status == "running" && existing_lease_until.is_some_and(|lease_until| lease_until > now)
{
@@ -994,7 +961,6 @@ SET
retry_at = NULL,
last_error = NULL
WHERE kind = ? AND job_key = ?
AND input_watermark > COALESCE(last_success_watermark, 0)
AND (status != 'running' OR lease_until IS NULL OR lease_until <= ?)
AND (retry_at IS NULL OR retry_at <= ?)
AND retry_remaining > 0
@@ -1063,8 +1029,7 @@ WHERE kind = ? AND job_key = ?
/// `max(existing_last_success_watermark, completed_watermark)`
/// - rewrites `selected_for_phase2` so only the exact selected stage-1
/// snapshots remain marked as part of the latest successful phase-2
/// selection, and persists each selected snapshot's
/// `source_updated_at` for future retained-vs-added diffing
/// selection, and persists each selected snapshot's `source_updated_at`
pub async fn mark_global_phase2_job_succeeded(
&self,
ownership_token: &str,
@@ -2271,16 +2236,6 @@ WHERE kind = 'memory_stage1'
"no-output without an existing stage1 output should not enqueue phase2"
);
let claim_phase2 = runtime
.try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600)
.await
.expect("claim phase2");
assert_eq!(
claim_phase2,
Phase2JobClaimOutcome::SkippedNotDirty,
"phase2 should remain clean when no-output deleted nothing"
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
@@ -2350,7 +2305,7 @@ WHERE kind = 'memory_stage1'
)
.await
.expect("mark initial phase2 succeeded"),
"initial phase2 success should clear global dirty state"
"initial phase2 success should finalize the global job"
);
let no_output_claim = runtime
@@ -2505,7 +2460,7 @@ WHERE kind = 'memory_stage1'
}
#[tokio::test]
async fn phase2_global_consolidation_reruns_when_watermark_advances() {
async fn phase2_global_lock_can_be_reclaimed_after_success_without_new_watermark() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
@@ -2537,24 +2492,13 @@ WHERE kind = 'memory_stage1'
"phase2 success should finalize for current token"
);
let claim_up_to_date = runtime
let claim_after_success = runtime
.try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600)
.await
.expect("claim phase2 up-to-date");
assert_eq!(claim_up_to_date, Phase2JobClaimOutcome::SkippedNotDirty);
runtime
.enqueue_global_consolidation(/*input_watermark*/ 101)
.await
.expect("enqueue global consolidation again");
let claim_rerun = runtime
.try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600)
.await
.expect("claim phase2 rerun");
.expect("claim phase2 after success");
assert!(
matches!(claim_rerun, Phase2JobClaimOutcome::Claimed { .. }),
"advanced watermark should be claimable"
matches!(claim_after_success, Phase2JobClaimOutcome::Claimed { .. }),
"the DB claim is only a lock; git workspace diff decides whether there is work"
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
@@ -2801,7 +2745,7 @@ VALUES (?, ?, ?, ?, ?)
}
#[tokio::test]
async fn get_phase2_input_selection_reports_added_retained_and_removed_rows() {
async fn get_phase2_input_selection_returns_current_selected_rows() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
@@ -2895,28 +2839,19 @@ VALUES (?, ?, ?, ?, ?)
.await
.expect("load phase2 input selection");
assert_eq!(selection.selected.len(), 2);
assert_eq!(selection.previous_selected.len(), 2);
assert_eq!(selection.selected[0].thread_id, thread_id_c);
assert_eq!(selection.len(), 2);
assert_eq!(selection[0].thread_id, thread_id_c);
assert_eq!(
selection.selected[0].rollout_path,
selection[0].rollout_path,
codex_home.join(format!("rollout-{thread_id_c}.jsonl"))
);
assert_eq!(selection.selected[1].thread_id, thread_id_b);
assert_eq!(selection.retained_thread_ids, vec![thread_id_c]);
assert_eq!(selection.removed.len(), 1);
assert_eq!(selection.removed[0].thread_id, thread_id_a);
assert_eq!(
selection.removed[0].rollout_slug.as_deref(),
Some("rollout-a")
);
assert_eq!(selection[1].thread_id, thread_id_b);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn get_phase2_input_selection_marks_polluted_previous_selection_as_removed() {
async fn get_phase2_input_selection_excludes_polluted_previous_selection() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
@@ -3002,24 +2937,8 @@ VALUES (?, ?, ?, ?, ?)
.await
.expect("load phase2 input selection");
assert_eq!(selection.selected.len(), 1);
assert_eq!(selection.selected[0].thread_id, thread_id_enabled);
assert_eq!(selection.previous_selected.len(), 2);
assert!(
selection
.previous_selected
.iter()
.any(|item| item.thread_id == thread_id_enabled)
);
assert!(
selection
.previous_selected
.iter()
.any(|item| item.thread_id == thread_id_polluted)
);
assert_eq!(selection.retained_thread_ids, vec![thread_id_enabled]);
assert_eq!(selection.removed.len(), 1);
assert_eq!(selection.removed[0].thread_id, thread_id_polluted);
assert_eq!(selection.len(), 1);
assert_eq!(selection[0].thread_id, thread_id_enabled);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
@@ -3113,7 +3032,7 @@ VALUES (?, ?, ?, ?, ?)
}
#[tokio::test]
async fn get_phase2_input_selection_treats_regenerated_selected_rows_as_added() {
async fn get_phase2_input_selection_returns_regenerated_selected_rows() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
@@ -3213,12 +3132,9 @@ VALUES (?, ?, ?, ?, ?)
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 36_500)
.await
.expect("load phase2 input selection");
assert_eq!(selection.selected.len(), 1);
assert_eq!(selection.previous_selected.len(), 1);
assert_eq!(selection.selected[0].thread_id, thread_id);
assert_eq!(selection.selected[0].source_updated_at.timestamp(), 101);
assert!(selection.retained_thread_ids.is_empty());
assert!(selection.removed.is_empty());
assert_eq!(selection.len(), 1);
assert_eq!(selection[0].thread_id, thread_id);
assert_eq!(selection[0].source_updated_at.timestamp(), 101);
let (selected_for_phase2, selected_for_phase2_source_updated_at) =
sqlx::query_as::<_, (i64, Option<i64>)>(
@@ -3235,7 +3151,7 @@ VALUES (?, ?, ?, ?, ?)
}
#[tokio::test]
async fn get_phase2_input_selection_reports_regenerated_previous_selection_as_removed() {
async fn get_phase2_input_selection_uses_current_ranking_after_refreshes() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
@@ -3368,29 +3284,11 @@ VALUES (?, ?, ?, ?, ?)
.expect("load phase2 input selection");
assert_eq!(
selection
.selected
.iter()
.map(|output| output.thread_id)
.collect::<Vec<_>>(),
vec![thread_id_d, thread_id_c]
);
assert_eq!(
selection
.previous_selected
.iter()
.map(|output| output.thread_id)
.collect::<Vec<_>>(),
vec![thread_id_a, thread_id_b]
);
assert!(selection.retained_thread_ids.is_empty());
assert_eq!(
selection
.removed
.iter()
.map(|output| (output.thread_id, output.source_updated_at.timestamp()))
.collect::<Vec<_>>(),
vec![(thread_id_a, 102), (thread_id_b, 101)]
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
@@ -3527,7 +3425,8 @@ VALUES (?, ?, ?, ?, ?)
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 36_500)
.await
.expect("load phase2 input selection after refresh");
assert_eq!(selection.retained_thread_ids, vec![thread_id]);
assert_eq!(selection.len(), 1);
assert_eq!(selection[0].thread_id, thread_id);
let (selected_for_phase2, selected_for_phase2_source_updated_at) =
sqlx::query_as::<_, (i64, Option<i64>)>(
@@ -3657,9 +3556,8 @@ VALUES (?, ?, ?, ?, ?)
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 36_500)
.await
.expect("load phase2 input selection");
assert_eq!(selection.selected.len(), 1);
assert_eq!(selection.selected[0].source_updated_at.timestamp(), 101);
assert!(selection.retained_thread_ids.is_empty());
assert_eq!(selection.len(), 1);
assert_eq!(selection[0].source_updated_at.timestamp(), 101);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
@@ -3870,7 +3768,6 @@ VALUES (?, ?, ?, ?, ?)
assert_eq!(
selection
.selected
.iter()
.map(|output| output.thread_id)
.collect::<Vec<_>>(),
@@ -3967,7 +3864,6 @@ VALUES (?, ?, ?, ?, ?)
assert_eq!(
selection
.selected
.iter()
.map(|output| output.thread_id)
.collect::<Vec<_>>(),
@@ -4056,9 +3952,9 @@ VALUES (?, ?, ?, ?, ?)
.await
.expect("load phase2 input selection");
assert_eq!(selection.selected.len(), 1);
assert_eq!(selection.selected[0].thread_id, newer_thread);
assert_eq!(selection.selected[0].source_updated_at.timestamp(), 200);
assert_eq!(selection.len(), 1);
assert_eq!(selection[0].thread_id, newer_thread);
assert_eq!(selection[0].source_updated_at.timestamp(), 200);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
@@ -4411,6 +4307,59 @@ VALUES (?, ?, ?, ?, ?)
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn phase2_global_lock_creates_missing_job_row() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner a");
let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner b");
let claim = runtime
.try_claim_global_phase2_job(owner_a, /*lease_seconds*/ 3_600)
.await
.expect("claim global phase2 lock");
let ownership_token = match claim {
Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => {
assert_eq!(input_watermark, 0);
ownership_token
}
other => panic!("unexpected phase2 lock claim outcome: {other:?}"),
};
let second_claim = runtime
.try_claim_global_phase2_job(owner_b, /*lease_seconds*/ 3_600)
.await
.expect("claim global phase2 lock from second owner");
assert_eq!(second_claim, Phase2JobClaimOutcome::SkippedRunning);
assert!(
runtime
.mark_global_phase2_job_succeeded(
ownership_token.as_str(),
/*completed_watermark*/ 0,
&[]
)
.await
.expect("mark phase2 lock success")
);
let claim_after_success = runtime
.try_claim_global_phase2_job(owner_b, /*lease_seconds*/ 3_600)
.await
.expect("claim global phase2 lock after success");
assert!(
matches!(claim_after_success, Phase2JobClaimOutcome::Claimed { .. }),
"git workspace diff, not the DB watermark, decides whether the claimed lock has work"
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn phase2_global_lock_stale_lease_allows_takeover() {
let codex_home = unique_temp_dir();
@@ -4487,7 +4436,7 @@ VALUES (?, ?, ?, ?, ?)
}
#[tokio::test]
async fn phase2_backfilled_inputs_below_last_success_still_become_dirty() {
async fn enqueue_global_consolidation_keeps_phase2_input_watermark_monotonic() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
@@ -4527,23 +4476,23 @@ VALUES (?, ?, ?, ?, ?)
runtime
.enqueue_global_consolidation(/*input_watermark*/ 400)
.await
.expect("enqueue backfilled consolidation");
.expect("enqueue lower-watermark consolidation");
let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner b");
let claim_b = runtime
.try_claim_global_phase2_job(owner_b, /*lease_seconds*/ 3_600)
.await
.expect("claim backfilled consolidation");
.expect("claim lower-watermark consolidation");
match claim_b {
Phase2JobClaimOutcome::Claimed {
input_watermark, ..
} => {
assert!(
input_watermark > 500,
"backfilled enqueue should advance dirty watermark beyond last success"
"lower-watermark enqueue should still advance the bookkeeping watermark"
);
}
other => panic!("unexpected backfilled phase2 claim outcome: {other:?}"),
other => panic!("unexpected lower-watermark phase2 claim outcome: {other:?}"),
}
let _ = tokio::fs::remove_dir_all(codex_home).await;
@@ -4608,7 +4557,7 @@ VALUES (?, ?, ?, ?, ?)
.try_claim_global_phase2_job(ThreadId::new(), /*lease_seconds*/ 3_600)
.await
.expect("claim after fallback failure");
assert_eq!(claim, Phase2JobClaimOutcome::SkippedNotDirty);
assert_eq!(claim, Phase2JobClaimOutcome::SkippedRetryUnavailable);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
+1 -2
View File
@@ -1579,7 +1579,7 @@ async fn reset_memories_clears_local_memory_directories() -> Result<()> {
app.config.sqlite_home = codex_home.path().to_path_buf();
let memory_root = codex_home.path().join("memories");
let extensions_root = codex_home.path().join("memories_extensions");
let extensions_root = memory_root.join("extensions");
std::fs::create_dir_all(memory_root.join("rollout_summaries"))?;
std::fs::create_dir_all(&extensions_root)?;
std::fs::write(memory_root.join("MEMORY.md"), "stale memory\n")?;
@@ -1594,7 +1594,6 @@ async fn reset_memories_clears_local_memory_directories() -> Result<()> {
app.reset_memories_with_app_server(&mut app_server).await;
assert_eq!(std::fs::read_dir(&memory_root)?.count(), 0);
assert_eq!(std::fs::read_dir(&extensions_root)?.count(), 0);
app_server.shutdown().await?;
Ok(())