mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
feat: trigger memories from user turns with cooldown (#19970)
## Why Memory startup was tied to thread lifecycle events such as create, load, and fork. That can run memory work before a thread receives real user input, and it makes startup cost scale with thread management instead of actual turns. Moving the trigger to `thread/sendInput` keeps memory startup aligned with the first real user turn and lets it use the current thread config at turn time. The idea is to prevent ghost cost due to pre-warm triggered by the app Turn-based startup can also make global phase-2 consolidation easier to request repeatedly, so this adds a success cooldown and tightens the default startup scan window. ## What Changed - Start `codex_memories_write::start_memories_startup_task` after a non-empty `thread/sendInput` turn is submitted, instead of from thread create/load/fork paths: https://github.com/openai/codex/blob/d4a6885b7829e2fd2ec7a09355e4f75ebe1d1fe3/codex-rs/app-server/src/codex_message_processor.rs#L6477-L6487 - Expose `CodexThread::config()` so app-server can pass the live config into memory startup at turn time. - Add a six-hour successful-run cooldown for global phase-2 consolidation via `SkippedCooldown`: https://github.com/openai/codex/blob/d4a6885b7829e2fd2ec7a09355e4f75ebe1d1fe3/codex-rs/state/src/runtime/memories.rs#L963-L966 - Reduce memory startup defaults to at most 2 rollouts over 10 days: https://github.com/openai/codex/blob/d4a6885b7829e2fd2ec7a09355e4f75ebe1d1fe3/codex-rs/config/src/types.rs#L31-L34 ## Verification Updated the memory runtime coverage around phase-2 reclaim behavior, including `phase2_global_lock_respects_success_cooldown`. --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
committed by
GitHub
Unverified
parent
fa127be25f
commit
a9e5c34083
@@ -542,7 +542,6 @@ pub(crate) enum ApiVersion {
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ListenerTaskContext {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
thread_state_manager: ThreadStateManager,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
@@ -2415,7 +2414,6 @@ impl CodexMessageProcessor {
|
||||
);
|
||||
typesafe_overrides.ephemeral = ephemeral;
|
||||
let listener_task_context = ListenerTaskContext {
|
||||
auth_manager: Arc::clone(&self.auth_manager),
|
||||
thread_manager: Arc::clone(&self.thread_manager),
|
||||
thread_state_manager: self.thread_state_manager.clone(),
|
||||
outgoing: Arc::clone(&self.outgoing),
|
||||
@@ -2615,7 +2613,6 @@ impl CodexMessageProcessor {
|
||||
.collect()
|
||||
};
|
||||
let core_dynamic_tool_count = core_dynamic_tools.len();
|
||||
let memory_config = Arc::new(config.clone());
|
||||
|
||||
let NewThread {
|
||||
thread_id,
|
||||
@@ -2669,14 +2666,6 @@ impl CodexMessageProcessor {
|
||||
otel.name = "app_server.thread_start.config_snapshot",
|
||||
))
|
||||
.await;
|
||||
codex_memories_write::start_memories_startup_task(
|
||||
Arc::clone(&listener_task_context.thread_manager),
|
||||
Arc::clone(&listener_task_context.auth_manager),
|
||||
thread_id,
|
||||
Arc::clone(&thread),
|
||||
Arc::clone(&memory_config),
|
||||
&config_snapshot.session_source,
|
||||
);
|
||||
let mut thread = build_thread_from_snapshot(
|
||||
thread_id,
|
||||
&config_snapshot,
|
||||
@@ -4178,7 +4167,6 @@ impl CodexMessageProcessor {
|
||||
|
||||
let instruction_sources = Self::instruction_sources_from_config(&config).await;
|
||||
let response_history = thread_history.clone();
|
||||
let memory_config = Arc::new(config.clone());
|
||||
|
||||
match self
|
||||
.thread_manager
|
||||
@@ -4253,14 +4241,6 @@ impl CodexMessageProcessor {
|
||||
/*has_live_in_progress_turn*/ false,
|
||||
);
|
||||
let config_snapshot = codex_thread.config_snapshot().await;
|
||||
codex_memories_write::start_memories_startup_task(
|
||||
Arc::clone(&self.thread_manager),
|
||||
Arc::clone(&self.auth_manager),
|
||||
thread_id,
|
||||
Arc::clone(&codex_thread),
|
||||
Arc::clone(&memory_config),
|
||||
&config_snapshot.session_source,
|
||||
);
|
||||
let sandbox = thread_response_sandbox_policy(
|
||||
&config_snapshot.permission_profile,
|
||||
config_snapshot.cwd.as_path(),
|
||||
@@ -4766,7 +4746,6 @@ impl CodexMessageProcessor {
|
||||
let fallback_model_provider = config.model_provider_id.clone();
|
||||
let instruction_sources = Self::instruction_sources_from_config(&config).await;
|
||||
let fork_thread_store = configured_thread_store(&config);
|
||||
let memory_config = Arc::new(config.clone());
|
||||
|
||||
let NewThread {
|
||||
thread_id,
|
||||
@@ -4862,14 +4841,6 @@ impl CodexMessageProcessor {
|
||||
/*has_in_progress_turn*/ false,
|
||||
);
|
||||
let config_snapshot = forked_thread.config_snapshot().await;
|
||||
codex_memories_write::start_memories_startup_task(
|
||||
Arc::clone(&self.thread_manager),
|
||||
Arc::clone(&self.auth_manager),
|
||||
thread_id,
|
||||
Arc::clone(&forked_thread),
|
||||
Arc::clone(&memory_config),
|
||||
&config_snapshot.session_source,
|
||||
);
|
||||
let sandbox = thread_response_sandbox_policy(
|
||||
&config_snapshot.permission_profile,
|
||||
config_snapshot.cwd.as_path(),
|
||||
@@ -6363,12 +6334,12 @@ impl CodexMessageProcessor {
|
||||
);
|
||||
return Err(error);
|
||||
}
|
||||
let (_, thread) = self
|
||||
.load_thread(¶ms.thread_id)
|
||||
.await
|
||||
.inspect_err(|error| {
|
||||
self.track_error_response(&request_id, error, /*error_type*/ None);
|
||||
})?;
|
||||
let (thread_id, thread) =
|
||||
self.load_thread(¶ms.thread_id)
|
||||
.await
|
||||
.inspect_err(|error| {
|
||||
self.track_error_response(&request_id, error, /*error_type*/ None);
|
||||
})?;
|
||||
Self::set_app_server_client_info(
|
||||
thread.as_ref(),
|
||||
app_server_client_name,
|
||||
@@ -6408,6 +6379,7 @@ impl CodexMessageProcessor {
|
||||
.into_iter()
|
||||
.map(V2UserInput::into_core)
|
||||
.collect();
|
||||
let turn_has_input = !mapped_items.is_empty();
|
||||
|
||||
let has_any_overrides = params.cwd.is_some()
|
||||
|| params.approval_policy.is_some()
|
||||
@@ -6502,6 +6474,18 @@ impl CodexMessageProcessor {
|
||||
error
|
||||
})?;
|
||||
|
||||
if turn_has_input {
|
||||
let config_snapshot = thread.config_snapshot().await;
|
||||
codex_memories_write::start_memories_startup_task(
|
||||
Arc::clone(&self.thread_manager),
|
||||
Arc::clone(&self.auth_manager),
|
||||
thread_id,
|
||||
Arc::clone(&thread),
|
||||
thread.config().await,
|
||||
&config_snapshot.session_source,
|
||||
);
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.record_request_turn_id(&request_id, &turn_id)
|
||||
.await;
|
||||
@@ -7188,7 +7172,6 @@ impl CodexMessageProcessor {
|
||||
) -> Result<EnsureConversationListenerResult, JSONRPCErrorError> {
|
||||
Self::ensure_conversation_listener_task(
|
||||
ListenerTaskContext {
|
||||
auth_manager: Arc::clone(&self.auth_manager),
|
||||
thread_manager: Arc::clone(&self.thread_manager),
|
||||
thread_state_manager: self.thread_state_manager.clone(),
|
||||
outgoing: Arc::clone(&self.outgoing),
|
||||
@@ -7306,7 +7289,6 @@ impl CodexMessageProcessor {
|
||||
) -> Result<(), JSONRPCErrorError> {
|
||||
Self::ensure_listener_task_running_task(
|
||||
ListenerTaskContext {
|
||||
auth_manager: Arc::clone(&self.auth_manager),
|
||||
thread_manager: Arc::clone(&self.thread_manager),
|
||||
thread_state_manager: self.thread_state_manager.clone(),
|
||||
outgoing: Arc::clone(&self.outgoing),
|
||||
@@ -7355,7 +7337,6 @@ impl CodexMessageProcessor {
|
||||
thread_state.set_listener(cancel_tx, &conversation)
|
||||
};
|
||||
let ListenerTaskContext {
|
||||
auth_manager: _,
|
||||
outgoing,
|
||||
thread_manager,
|
||||
thread_state_manager,
|
||||
|
||||
@@ -29,8 +29,8 @@ use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev";
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 16;
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 2;
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 10;
|
||||
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
|
||||
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION: usize = 256;
|
||||
pub const DEFAULT_MEMORIES_MAX_UNUSED_DAYS: i64 = 30;
|
||||
|
||||
@@ -34,6 +34,7 @@ use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use rmcp::model::ReadResourceRequestParams;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::watch;
|
||||
|
||||
@@ -376,6 +377,10 @@ impl CodexThread {
|
||||
self.codex.thread_config_snapshot().await
|
||||
}
|
||||
|
||||
pub async fn config(&self) -> Arc<crate::config::Config> {
|
||||
self.codex.session.get_config().await
|
||||
}
|
||||
|
||||
pub async fn read_mcp_resource(
|
||||
&self,
|
||||
server: &str,
|
||||
|
||||
@@ -244,6 +244,9 @@ mod job {
|
||||
codex_state::Phase2JobClaimOutcome::SkippedRetryUnavailable => {
|
||||
return Err("skipped_retry_unavailable");
|
||||
}
|
||||
codex_state::Phase2JobClaimOutcome::SkippedCooldown => {
|
||||
return Err("skipped_cooldown");
|
||||
}
|
||||
codex_state::Phase2JobClaimOutcome::SkippedRunning => return Err("skipped_running"),
|
||||
};
|
||||
|
||||
|
||||
@@ -117,6 +117,8 @@ pub enum Phase2JobClaimOutcome {
|
||||
},
|
||||
/// The global job is in retry backoff.
|
||||
SkippedRetryUnavailable,
|
||||
/// The global job completed recently enough that consolidation is cooling down.
|
||||
SkippedCooldown,
|
||||
/// Another worker currently owns a fresh global consolidation lease.
|
||||
SkippedRunning,
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use uuid::Uuid;
|
||||
const JOB_KIND_MEMORY_STAGE1: &str = "memory_stage1";
|
||||
const JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL: &str = "memory_consolidate_global";
|
||||
const MEMORY_CONSOLIDATION_JOB_KEY: &str = "global";
|
||||
const PHASE2_SUCCESS_COOLDOWN_SECONDS: i64 = 6 * 60 * 60;
|
||||
|
||||
const DEFAULT_RETRY_REMAINING: i64 = 3;
|
||||
|
||||
@@ -873,6 +874,8 @@ WHERE kind = ? AND job_key = ?
|
||||
/// dirtiness is the source of truth after the caller materializes inputs
|
||||
/// - returns `SkippedRetryUnavailable` when retry backoff is active
|
||||
/// - returns `SkippedRunning` when an active running lease exists
|
||||
/// - returns `SkippedCooldown` when the latest successful run finished
|
||||
/// within the phase-2 success cooldown
|
||||
/// - otherwise updates the row to `running`, sets ownership + lease, and
|
||||
/// returns `Claimed`
|
||||
pub async fn try_claim_global_phase2_job(
|
||||
@@ -882,6 +885,7 @@ WHERE kind = ? AND job_key = ?
|
||||
) -> anyhow::Result<Phase2JobClaimOutcome> {
|
||||
let now = Utc::now().timestamp();
|
||||
let lease_until = now.saturating_add(lease_seconds.max(0));
|
||||
let cooldown_cutoff = now.saturating_sub(PHASE2_SUCCESS_COOLDOWN_SECONDS);
|
||||
let ownership_token = Uuid::new_v4().to_string();
|
||||
let worker_id = worker_id.to_string();
|
||||
|
||||
@@ -889,7 +893,7 @@ WHERE kind = ? AND job_key = ?
|
||||
|
||||
let existing_job = sqlx::query(
|
||||
r#"
|
||||
SELECT status, lease_until, retry_at, input_watermark
|
||||
SELECT status, lease_until, retry_at, input_watermark, finished_at, last_error
|
||||
FROM jobs
|
||||
WHERE kind = ? AND job_key = ?
|
||||
"#,
|
||||
@@ -946,6 +950,8 @@ INSERT INTO jobs (
|
||||
let status: String = existing_job.try_get("status")?;
|
||||
let existing_lease_until: Option<i64> = existing_job.try_get("lease_until")?;
|
||||
let retry_at: Option<i64> = existing_job.try_get("retry_at")?;
|
||||
let finished_at: Option<i64> = existing_job.try_get("finished_at")?;
|
||||
let last_error: Option<String> = existing_job.try_get("last_error")?;
|
||||
if retry_at.is_some_and(|retry_at| retry_at > now) {
|
||||
tx.commit().await?;
|
||||
return Ok(Phase2JobClaimOutcome::SkippedRetryUnavailable);
|
||||
@@ -955,6 +961,12 @@ INSERT INTO jobs (
|
||||
tx.commit().await?;
|
||||
return Ok(Phase2JobClaimOutcome::SkippedRunning);
|
||||
}
|
||||
if last_error.is_none()
|
||||
&& finished_at.is_some_and(|finished_at| finished_at > cooldown_cutoff)
|
||||
{
|
||||
tx.commit().await?;
|
||||
return Ok(Phase2JobClaimOutcome::SkippedCooldown);
|
||||
}
|
||||
|
||||
let rows_affected = sqlx::query(
|
||||
r#"
|
||||
@@ -971,6 +983,7 @@ SET
|
||||
WHERE kind = ? AND job_key = ?
|
||||
AND (status != 'running' OR lease_until IS NULL OR lease_until <= ?)
|
||||
AND (retry_at IS NULL OR retry_at <= ?)
|
||||
AND (last_error IS NOT NULL OR finished_at IS NULL OR finished_at <= ?)
|
||||
"#,
|
||||
)
|
||||
.bind(worker_id.as_str())
|
||||
@@ -981,6 +994,7 @@ WHERE kind = ? AND job_key = ?
|
||||
.bind(MEMORY_CONSOLIDATION_JOB_KEY)
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(cooldown_cutoff)
|
||||
.execute(&mut *tx)
|
||||
.await?
|
||||
.rows_affected();
|
||||
@@ -1260,6 +1274,8 @@ ON CONFLICT(kind, job_key) DO UPDATE SET
|
||||
mod tests {
|
||||
use super::JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL;
|
||||
use super::JOB_KIND_MEMORY_STAGE1;
|
||||
use super::MEMORY_CONSOLIDATION_JOB_KEY;
|
||||
use super::PHASE2_SUCCESS_COOLDOWN_SECONDS;
|
||||
use super::StateRuntime;
|
||||
use super::test_support::test_thread_metadata;
|
||||
use super::test_support::unique_temp_dir;
|
||||
@@ -1278,6 +1294,16 @@ mod tests {
|
||||
ThreadId::from_string(value).expect("thread id")
|
||||
}
|
||||
|
||||
async fn age_phase2_success_beyond_cooldown(runtime: &StateRuntime) {
|
||||
sqlx::query("UPDATE jobs SET finished_at = ? WHERE kind = ? AND job_key = ?")
|
||||
.bind(Utc::now().timestamp() - PHASE2_SUCCESS_COOLDOWN_SECONDS - 1)
|
||||
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)
|
||||
.bind(MEMORY_CONSOLIDATION_JOB_KEY)
|
||||
.execute(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("age phase2 success beyond cooldown");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stage1_claim_skips_when_up_to_date() {
|
||||
let codex_home = unique_temp_dir();
|
||||
@@ -2363,6 +2389,7 @@ WHERE kind = 'memory_stage1'
|
||||
.expect("stage1 output count");
|
||||
assert_eq!(output_row_count, 0);
|
||||
|
||||
age_phase2_success_beyond_cooldown(&runtime).await;
|
||||
let claim_phase2 = runtime
|
||||
.try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600)
|
||||
.await
|
||||
@@ -2486,7 +2513,7 @@ WHERE kind = 'memory_stage1'
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn phase2_global_lock_can_be_reclaimed_after_success_without_new_watermark() {
|
||||
async fn phase2_global_lock_respects_success_cooldown() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
|
||||
.await
|
||||
@@ -2522,10 +2549,27 @@ WHERE kind = 'memory_stage1'
|
||||
.try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600)
|
||||
.await
|
||||
.expect("claim phase2 after success");
|
||||
assert!(
|
||||
matches!(claim_after_success, Phase2JobClaimOutcome::Claimed { .. }),
|
||||
"the DB claim is only a lock; git workspace diff decides whether there is work"
|
||||
);
|
||||
assert_eq!(claim_after_success, Phase2JobClaimOutcome::SkippedCooldown);
|
||||
|
||||
runtime
|
||||
.enqueue_global_consolidation(/*input_watermark*/ 101)
|
||||
.await
|
||||
.expect("enqueue global consolidation after success");
|
||||
let claim_after_enqueue = runtime
|
||||
.try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600)
|
||||
.await
|
||||
.expect("claim phase2 after enqueue");
|
||||
assert_eq!(claim_after_enqueue, Phase2JobClaimOutcome::SkippedCooldown);
|
||||
|
||||
age_phase2_success_beyond_cooldown(&runtime).await;
|
||||
let claim_after_cooldown = runtime
|
||||
.try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600)
|
||||
.await
|
||||
.expect("claim phase2 after cooldown");
|
||||
assert!(matches!(
|
||||
claim_after_cooldown,
|
||||
Phase2JobClaimOutcome::Claimed { .. }
|
||||
));
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
@@ -3127,6 +3171,7 @@ VALUES (?, ?, ?, ?, ?)
|
||||
"thread should transition to polluted"
|
||||
);
|
||||
|
||||
age_phase2_success_beyond_cooldown(&runtime).await;
|
||||
let next_claim = runtime
|
||||
.try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600)
|
||||
.await
|
||||
@@ -3495,6 +3540,7 @@ VALUES (?, ?, ?, ?, ?)
|
||||
"refreshed stage1 success should persist output"
|
||||
);
|
||||
|
||||
age_phase2_success_beyond_cooldown(&runtime).await;
|
||||
let second_phase2_claim = runtime
|
||||
.try_claim_global_phase2_job(owner, /*lease_seconds*/ 3600)
|
||||
.await
|
||||
@@ -4457,10 +4503,7 @@ VALUES (?, ?, ?, ?, ?)
|
||||
.try_claim_global_phase2_job(owner_b, /*lease_seconds*/ 3_600)
|
||||
.await
|
||||
.expect("claim global phase2 lock after success");
|
||||
assert!(
|
||||
matches!(claim_after_success, Phase2JobClaimOutcome::Claimed { .. }),
|
||||
"git workspace diff, not the DB watermark, decides whether the claimed lock has work"
|
||||
);
|
||||
assert_eq!(claim_after_success, Phase2JobClaimOutcome::SkippedCooldown);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
@@ -4583,6 +4626,7 @@ VALUES (?, ?, ?, ?, ?)
|
||||
.await
|
||||
.expect("enqueue lower-watermark consolidation");
|
||||
|
||||
age_phase2_success_beyond_cooldown(&runtime).await;
|
||||
let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner b");
|
||||
let claim_b = runtime
|
||||
.try_claim_global_phase2_job(owner_b, /*lease_seconds*/ 3_600)
|
||||
|
||||
Reference in New Issue
Block a user