diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 519ea50fd..13f5bb7eb 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -542,7 +542,6 @@ pub(crate) enum ApiVersion { #[derive(Clone)] struct ListenerTaskContext { - auth_manager: Arc, thread_manager: Arc, thread_state_manager: ThreadStateManager, outgoing: Arc, @@ -2415,7 +2414,6 @@ impl CodexMessageProcessor { ); typesafe_overrides.ephemeral = ephemeral; let listener_task_context = ListenerTaskContext { - auth_manager: Arc::clone(&self.auth_manager), thread_manager: Arc::clone(&self.thread_manager), thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), @@ -2615,7 +2613,6 @@ impl CodexMessageProcessor { .collect() }; let core_dynamic_tool_count = core_dynamic_tools.len(); - let memory_config = Arc::new(config.clone()); let NewThread { thread_id, @@ -2669,14 +2666,6 @@ impl CodexMessageProcessor { otel.name = "app_server.thread_start.config_snapshot", )) .await; - codex_memories_write::start_memories_startup_task( - Arc::clone(&listener_task_context.thread_manager), - Arc::clone(&listener_task_context.auth_manager), - thread_id, - Arc::clone(&thread), - Arc::clone(&memory_config), - &config_snapshot.session_source, - ); let mut thread = build_thread_from_snapshot( thread_id, &config_snapshot, @@ -4178,7 +4167,6 @@ impl CodexMessageProcessor { let instruction_sources = Self::instruction_sources_from_config(&config).await; let response_history = thread_history.clone(); - let memory_config = Arc::new(config.clone()); match self .thread_manager @@ -4253,14 +4241,6 @@ impl CodexMessageProcessor { /*has_live_in_progress_turn*/ false, ); let config_snapshot = codex_thread.config_snapshot().await; - codex_memories_write::start_memories_startup_task( - Arc::clone(&self.thread_manager), - Arc::clone(&self.auth_manager), - thread_id, - Arc::clone(&codex_thread), - Arc::clone(&memory_config), - &config_snapshot.session_source, - ); let sandbox = thread_response_sandbox_policy( &config_snapshot.permission_profile, config_snapshot.cwd.as_path(), @@ -4766,7 +4746,6 @@ impl CodexMessageProcessor { let fallback_model_provider = config.model_provider_id.clone(); let instruction_sources = Self::instruction_sources_from_config(&config).await; let fork_thread_store = configured_thread_store(&config); - let memory_config = Arc::new(config.clone()); let NewThread { thread_id, @@ -4862,14 +4841,6 @@ impl CodexMessageProcessor { /*has_in_progress_turn*/ false, ); let config_snapshot = forked_thread.config_snapshot().await; - codex_memories_write::start_memories_startup_task( - Arc::clone(&self.thread_manager), - Arc::clone(&self.auth_manager), - thread_id, - Arc::clone(&forked_thread), - Arc::clone(&memory_config), - &config_snapshot.session_source, - ); let sandbox = thread_response_sandbox_policy( &config_snapshot.permission_profile, config_snapshot.cwd.as_path(), @@ -6363,12 +6334,12 @@ impl CodexMessageProcessor { ); return Err(error); } - let (_, thread) = self - .load_thread(¶ms.thread_id) - .await - .inspect_err(|error| { - self.track_error_response(&request_id, error, /*error_type*/ None); - })?; + let (thread_id, thread) = + self.load_thread(¶ms.thread_id) + .await + .inspect_err(|error| { + self.track_error_response(&request_id, error, /*error_type*/ None); + })?; Self::set_app_server_client_info( thread.as_ref(), app_server_client_name, @@ -6408,6 +6379,7 @@ impl CodexMessageProcessor { .into_iter() .map(V2UserInput::into_core) .collect(); + let turn_has_input = !mapped_items.is_empty(); let has_any_overrides = params.cwd.is_some() || params.approval_policy.is_some() @@ -6502,6 +6474,18 @@ impl CodexMessageProcessor { error })?; + if turn_has_input { + let config_snapshot = thread.config_snapshot().await; + codex_memories_write::start_memories_startup_task( + Arc::clone(&self.thread_manager), + Arc::clone(&self.auth_manager), + thread_id, + Arc::clone(&thread), + thread.config().await, + &config_snapshot.session_source, + ); + } + self.outgoing .record_request_turn_id(&request_id, &turn_id) .await; @@ -7188,7 +7172,6 @@ impl CodexMessageProcessor { ) -> Result { Self::ensure_conversation_listener_task( ListenerTaskContext { - auth_manager: Arc::clone(&self.auth_manager), thread_manager: Arc::clone(&self.thread_manager), thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), @@ -7306,7 +7289,6 @@ impl CodexMessageProcessor { ) -> Result<(), JSONRPCErrorError> { Self::ensure_listener_task_running_task( ListenerTaskContext { - auth_manager: Arc::clone(&self.auth_manager), thread_manager: Arc::clone(&self.thread_manager), thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), @@ -7355,7 +7337,6 @@ impl CodexMessageProcessor { thread_state.set_listener(cancel_tx, &conversation) }; let ListenerTaskContext { - auth_manager: _, outgoing, thread_manager, thread_state_manager, diff --git a/codex-rs/config/src/types.rs b/codex-rs/config/src/types.rs index 1b9066d07..f46a7fc00 100644 --- a/codex-rs/config/src/types.rs +++ b/codex-rs/config/src/types.rs @@ -29,8 +29,8 @@ use serde::Deserialize; use serde::Serialize; pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev"; -pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 16; -pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30; +pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 2; +pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 10; pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6; pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION: usize = 256; pub const DEFAULT_MEMORIES_MAX_UNUSED_DAYS: i64 = 30; diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index eb5afe57f..47f1b99ce 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -34,6 +34,7 @@ use codex_utils_absolute_path::AbsolutePathBuf; use rmcp::model::ReadResourceRequestParams; use std::collections::HashMap; use std::path::PathBuf; +use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::watch; @@ -376,6 +377,10 @@ impl CodexThread { self.codex.thread_config_snapshot().await } + pub async fn config(&self) -> Arc { + self.codex.session.get_config().await + } + pub async fn read_mcp_resource( &self, server: &str, diff --git a/codex-rs/memories/write/src/phase2.rs b/codex-rs/memories/write/src/phase2.rs index 7278207f3..2d092abfb 100644 --- a/codex-rs/memories/write/src/phase2.rs +++ b/codex-rs/memories/write/src/phase2.rs @@ -244,6 +244,9 @@ mod job { codex_state::Phase2JobClaimOutcome::SkippedRetryUnavailable => { return Err("skipped_retry_unavailable"); } + codex_state::Phase2JobClaimOutcome::SkippedCooldown => { + return Err("skipped_cooldown"); + } codex_state::Phase2JobClaimOutcome::SkippedRunning => return Err("skipped_running"), }; diff --git a/codex-rs/state/src/model/memories.rs b/codex-rs/state/src/model/memories.rs index ada9d4e1e..9bb34405a 100644 --- a/codex-rs/state/src/model/memories.rs +++ b/codex-rs/state/src/model/memories.rs @@ -117,6 +117,8 @@ pub enum Phase2JobClaimOutcome { }, /// The global job is in retry backoff. SkippedRetryUnavailable, + /// The global job completed recently enough that consolidation is cooling down. + SkippedCooldown, /// Another worker currently owns a fresh global consolidation lease. SkippedRunning, } diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 1ac5e202c..5b75225b1 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -19,6 +19,7 @@ use uuid::Uuid; const JOB_KIND_MEMORY_STAGE1: &str = "memory_stage1"; const JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL: &str = "memory_consolidate_global"; const MEMORY_CONSOLIDATION_JOB_KEY: &str = "global"; +const PHASE2_SUCCESS_COOLDOWN_SECONDS: i64 = 6 * 60 * 60; const DEFAULT_RETRY_REMAINING: i64 = 3; @@ -873,6 +874,8 @@ WHERE kind = ? AND job_key = ? /// dirtiness is the source of truth after the caller materializes inputs /// - returns `SkippedRetryUnavailable` when retry backoff is active /// - returns `SkippedRunning` when an active running lease exists + /// - returns `SkippedCooldown` when the latest successful run finished + /// within the phase-2 success cooldown /// - otherwise updates the row to `running`, sets ownership + lease, and /// returns `Claimed` pub async fn try_claim_global_phase2_job( @@ -882,6 +885,7 @@ WHERE kind = ? AND job_key = ? ) -> anyhow::Result { let now = Utc::now().timestamp(); let lease_until = now.saturating_add(lease_seconds.max(0)); + let cooldown_cutoff = now.saturating_sub(PHASE2_SUCCESS_COOLDOWN_SECONDS); let ownership_token = Uuid::new_v4().to_string(); let worker_id = worker_id.to_string(); @@ -889,7 +893,7 @@ WHERE kind = ? AND job_key = ? let existing_job = sqlx::query( r#" -SELECT status, lease_until, retry_at, input_watermark +SELECT status, lease_until, retry_at, input_watermark, finished_at, last_error FROM jobs WHERE kind = ? AND job_key = ? "#, @@ -946,6 +950,8 @@ INSERT INTO jobs ( let status: String = existing_job.try_get("status")?; let existing_lease_until: Option = existing_job.try_get("lease_until")?; let retry_at: Option = existing_job.try_get("retry_at")?; + let finished_at: Option = existing_job.try_get("finished_at")?; + let last_error: Option = existing_job.try_get("last_error")?; if retry_at.is_some_and(|retry_at| retry_at > now) { tx.commit().await?; return Ok(Phase2JobClaimOutcome::SkippedRetryUnavailable); @@ -955,6 +961,12 @@ INSERT INTO jobs ( tx.commit().await?; return Ok(Phase2JobClaimOutcome::SkippedRunning); } + if last_error.is_none() + && finished_at.is_some_and(|finished_at| finished_at > cooldown_cutoff) + { + tx.commit().await?; + return Ok(Phase2JobClaimOutcome::SkippedCooldown); + } let rows_affected = sqlx::query( r#" @@ -971,6 +983,7 @@ SET WHERE kind = ? AND job_key = ? AND (status != 'running' OR lease_until IS NULL OR lease_until <= ?) AND (retry_at IS NULL OR retry_at <= ?) + AND (last_error IS NOT NULL OR finished_at IS NULL OR finished_at <= ?) "#, ) .bind(worker_id.as_str()) @@ -981,6 +994,7 @@ WHERE kind = ? AND job_key = ? .bind(MEMORY_CONSOLIDATION_JOB_KEY) .bind(now) .bind(now) + .bind(cooldown_cutoff) .execute(&mut *tx) .await? .rows_affected(); @@ -1260,6 +1274,8 @@ ON CONFLICT(kind, job_key) DO UPDATE SET mod tests { use super::JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL; use super::JOB_KIND_MEMORY_STAGE1; + use super::MEMORY_CONSOLIDATION_JOB_KEY; + use super::PHASE2_SUCCESS_COOLDOWN_SECONDS; use super::StateRuntime; use super::test_support::test_thread_metadata; use super::test_support::unique_temp_dir; @@ -1278,6 +1294,16 @@ mod tests { ThreadId::from_string(value).expect("thread id") } + async fn age_phase2_success_beyond_cooldown(runtime: &StateRuntime) { + sqlx::query("UPDATE jobs SET finished_at = ? WHERE kind = ? AND job_key = ?") + .bind(Utc::now().timestamp() - PHASE2_SUCCESS_COOLDOWN_SECONDS - 1) + .bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL) + .bind(MEMORY_CONSOLIDATION_JOB_KEY) + .execute(runtime.pool.as_ref()) + .await + .expect("age phase2 success beyond cooldown"); + } + #[tokio::test] async fn stage1_claim_skips_when_up_to_date() { let codex_home = unique_temp_dir(); @@ -2363,6 +2389,7 @@ WHERE kind = 'memory_stage1' .expect("stage1 output count"); assert_eq!(output_row_count, 0); + age_phase2_success_beyond_cooldown(&runtime).await; let claim_phase2 = runtime .try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600) .await @@ -2486,7 +2513,7 @@ WHERE kind = 'memory_stage1' } #[tokio::test] - async fn phase2_global_lock_can_be_reclaimed_after_success_without_new_watermark() { + async fn phase2_global_lock_respects_success_cooldown() { let codex_home = unique_temp_dir(); let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await @@ -2522,10 +2549,27 @@ WHERE kind = 'memory_stage1' .try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600) .await .expect("claim phase2 after success"); - assert!( - matches!(claim_after_success, Phase2JobClaimOutcome::Claimed { .. }), - "the DB claim is only a lock; git workspace diff decides whether there is work" - ); + assert_eq!(claim_after_success, Phase2JobClaimOutcome::SkippedCooldown); + + runtime + .enqueue_global_consolidation(/*input_watermark*/ 101) + .await + .expect("enqueue global consolidation after success"); + let claim_after_enqueue = runtime + .try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600) + .await + .expect("claim phase2 after enqueue"); + assert_eq!(claim_after_enqueue, Phase2JobClaimOutcome::SkippedCooldown); + + age_phase2_success_beyond_cooldown(&runtime).await; + let claim_after_cooldown = runtime + .try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600) + .await + .expect("claim phase2 after cooldown"); + assert!(matches!( + claim_after_cooldown, + Phase2JobClaimOutcome::Claimed { .. } + )); let _ = tokio::fs::remove_dir_all(codex_home).await; } @@ -3127,6 +3171,7 @@ VALUES (?, ?, ?, ?, ?) "thread should transition to polluted" ); + age_phase2_success_beyond_cooldown(&runtime).await; let next_claim = runtime .try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600) .await @@ -3495,6 +3540,7 @@ VALUES (?, ?, ?, ?, ?) "refreshed stage1 success should persist output" ); + age_phase2_success_beyond_cooldown(&runtime).await; let second_phase2_claim = runtime .try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600) .await @@ -4457,10 +4503,7 @@ VALUES (?, ?, ?, ?, ?) .try_claim_global_phase2_job(owner_b, /*lease_seconds*/ 3_600) .await .expect("claim global phase2 lock after success"); - assert!( - matches!(claim_after_success, Phase2JobClaimOutcome::Claimed { .. }), - "git workspace diff, not the DB watermark, decides whether the claimed lock has work" - ); + assert_eq!(claim_after_success, Phase2JobClaimOutcome::SkippedCooldown); let _ = tokio::fs::remove_dir_all(codex_home).await; } @@ -4583,6 +4626,7 @@ VALUES (?, ?, ?, ?, ?) .await .expect("enqueue lower-watermark consolidation"); + age_phase2_success_beyond_cooldown(&runtime).await; let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner b"); let claim_b = runtime .try_claim_global_phase2_job(owner_b, /*lease_seconds*/ 3_600)