diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index b08545d7a..5ae41bcd8 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -384,6 +384,16 @@ async fn open_sqlite( pub(super) async fn ensure_backfill_state_row_in_pool( pool: &sqlx::SqlitePool, ) -> anyhow::Result<()> { + // Eagerly check if the operation would have no effect to avoid blocking waiting for a SQLite + // writer for no reason in the hot startup path. + if sqlx::query_scalar::<_, i64>("SELECT 1 FROM backfill_state WHERE id = 1") + .fetch_optional(pool) + .await? + .is_some() + { + return Ok(()); + } + sqlx::query( r#" INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at) diff --git a/codex-rs/state/src/runtime/backfill.rs b/codex-rs/state/src/runtime/backfill.rs index 77ab57fce..3bfa74fb9 100644 --- a/codex-rs/state/src/runtime/backfill.rs +++ b/codex-rs/state/src/runtime/backfill.rs @@ -110,9 +110,11 @@ WHERE id = 1 #[cfg(test)] mod tests { use super::StateRuntime; + use super::base_sqlite_options; use super::test_support::unique_temp_dir; use chrono::Utc; use pretty_assertions::assert_eq; + use sqlx::Connection; #[tokio::test] async fn backfill_state_persists_progress_and_completion() { @@ -167,6 +169,61 @@ mod tests { let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn get_backfill_state_succeeds_while_another_connection_holds_writer_slot() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("initialize runtime"); + let mut write_connection = sqlx::SqliteConnection::connect_with(&base_sqlite_options( + &crate::state_db_path(codex_home.as_path()), + )) + .await + .expect("open write connection"); + let write_transaction = write_connection + .begin_with("BEGIN IMMEDIATE") + .await + .expect("acquire write lock"); + + let state = runtime + .get_backfill_state() + .await + .expect("get backfill state"); + assert_eq!(state, crate::BackfillState::default()); + + write_transaction + .rollback() + .await + .expect("release write lock"); + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn get_backfill_state_repairs_a_missing_singleton_row() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("initialize runtime"); + sqlx::query("DELETE FROM backfill_state WHERE id = 1") + .execute(runtime.pool.as_ref()) + .await + .expect("delete backfill state row"); + + let state = runtime + .get_backfill_state() + .await + .expect("get repaired backfill state"); + assert_eq!(state, crate::BackfillState::default()); + let row_count = + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM backfill_state WHERE id = 1") + .fetch_one(runtime.pool.as_ref()) + .await + .expect("count repaired backfill state rows"); + assert_eq!(row_count, 1); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn backfill_claim_is_singleton_until_stale_and_blocked_when_complete() { let codex_home = unique_temp_dir();