From 4f8c58f73744c6b1cebacb520cde695698745cb8 Mon Sep 17 00:00:00 2001 From: acrognale-oai Date: Wed, 22 Apr 2026 06:10:09 -0400 Subject: [PATCH] Support multiple cwd filters for thread list (#18502) ## Summary - Teach app-server `thread/list` to accept either a single `cwd` or an array of cwd filters, returning threads whose recorded session cwd matches any requested path - Add `useStateDbOnly` as an explicit opt-in fast path for callers that want to answer `thread/list` from SQLite without scanning JSONL rollout files - Preserve backwards compatibility: by default, `thread/list` still scans JSONL rollouts and repairs SQLite state - Wire the new cwd array and SQLite-only options through app-server, local/remote thread-store, rollout listing, generated TypeScript/schema fixtures, proto output, and docs ## Test Plan - `cargo test -p codex-app-server-protocol` - `cargo test -p codex-rollout` - `cargo test -p codex-thread-store` - `cargo test -p codex-app-server thread_list` - `just fmt` - `just fix -p codex-app-server-protocol -p codex-rollout -p codex-thread-store -p codex-app-server` - `cargo build -p codex-cli --bin codex` --- .../schema/json/ClientRequest.json | 31 +- .../codex_app_server_protocol.schemas.json | 31 +- .../codex_app_server_protocol.v2.schemas.json | 31 +- .../schema/json/v2/ThreadListParams.json | 31 +- .../schema/typescript/v2/ThreadListParams.ts | 12 +- .../app-server-protocol/src/protocol/v2.rs | 60 +++- codex-rs/app-server-test-client/src/lib.rs | 1 + codex-rs/app-server/README.md | 4 +- .../app-server/src/codex_message_processor.rs | 103 ++++--- .../app-server/tests/suite/v2/thread_fork.rs | 1 + .../app-server/tests/suite/v2/thread_list.rs | 181 +++++++++++- .../app-server/tests/suite/v2/thread_read.rs | 1 + codex-rs/core/src/personality_migration.rs | 2 + codex-rs/core/src/realtime_context.rs | 2 + codex-rs/debug-client/src/client.rs | 1 + codex-rs/exec/src/lib.rs | 2 + codex-rs/rollout/src/list.rs | 68 ++++- codex-rs/rollout/src/recorder.rs | 256 ++++++++++++++--- codex-rs/rollout/src/recorder_tests.rs | 272 ++++++++++++++++++ codex-rs/rollout/src/state_db.rs | 8 + codex-rs/rollout/src/tests.rs | 18 ++ .../0027_threads_cwd_sort_indexes.sql | 2 + codex-rs/state/src/runtime/memories.rs | 1 + codex-rs/state/src/runtime/threads.rs | 91 ++++++ .../thread-store/src/local/archive_thread.rs | 2 + .../thread-store/src/local/list_threads.rs | 44 ++- .../thread-store/src/remote/list_threads.rs | 16 ++ .../remote/proto/codex.thread_store.v1.proto | 6 + .../src/remote/proto/codex.thread_store.v1.rs | 9 + codex-rs/thread-store/src/types.rs | 5 + codex-rs/tui/src/lib.rs | 15 +- codex-rs/tui/src/resume_picker.rs | 9 +- 32 files changed, 1183 insertions(+), 133 deletions(-) create mode 100644 codex-rs/state/migrations/0027_threads_cwd_sort_indexes.sql diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 3ddced86c..66718b105 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3082,6 +3082,19 @@ ], "type": "object" }, + "ThreadListCwdFilter": { + "anyOf": [ + { + "type": "string" + }, + { + "items": { + "type": "string" + }, + "type": "array" + } + ] + }, "ThreadListParams": { "properties": { "archived": { @@ -3099,11 +3112,15 @@ ] }, "cwd": { - "description": "Optional cwd filter; when set, only threads whose session cwd exactly matches this path are returned.", - "type": [ - "string", - "null" - ] + "anyOf": [ + { + "$ref": "#/definitions/ThreadListCwdFilter" + }, + { + "type": "null" + } + ], + "description": "Optional cwd filter or filters; when set, only threads whose session cwd exactly matches one of these paths are returned." }, "limit": { "description": "Optional page size; defaults to a reasonable server-side value.", @@ -3162,6 +3179,10 @@ "array", "null" ] + }, + "useStateDbOnly": { + "description": "If true, return from the state DB without scanning JSONL rollouts to repair thread metadata. Omitted or false preserves scan-and-repair behavior.", + "type": "boolean" } }, "type": "object" diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 231df093f..33b0cde98 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -14868,6 +14868,19 @@ } ] }, + "ThreadListCwdFilter": { + "anyOf": [ + { + "type": "string" + }, + { + "items": { + "type": "string" + }, + "type": "array" + } + ] + }, "ThreadListParams": { "$schema": "http://json-schema.org/draft-07/schema#", "properties": { @@ -14886,11 +14899,15 @@ ] }, "cwd": { - "description": "Optional cwd filter; when set, only threads whose session cwd exactly matches this path are returned.", - "type": [ - "string", - "null" - ] + "anyOf": [ + { + "$ref": "#/definitions/v2/ThreadListCwdFilter" + }, + { + "type": "null" + } + ], + "description": "Optional cwd filter or filters; when set, only threads whose session cwd exactly matches one of these paths are returned." }, "limit": { "description": "Optional page size; defaults to a reasonable server-side value.", @@ -14949,6 +14966,10 @@ "array", "null" ] + }, + "useStateDbOnly": { + "description": "If true, return from the state DB without scanning JSONL rollouts to repair thread metadata. Omitted or false preserves scan-and-repair behavior.", + "type": "boolean" } }, "title": "ThreadListParams", diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 842043ff3..ca0f17a26 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -12762,6 +12762,19 @@ } ] }, + "ThreadListCwdFilter": { + "anyOf": [ + { + "type": "string" + }, + { + "items": { + "type": "string" + }, + "type": "array" + } + ] + }, "ThreadListParams": { "$schema": "http://json-schema.org/draft-07/schema#", "properties": { @@ -12780,11 +12793,15 @@ ] }, "cwd": { - "description": "Optional cwd filter; when set, only threads whose session cwd exactly matches this path are returned.", - "type": [ - "string", - "null" - ] + "anyOf": [ + { + "$ref": "#/definitions/ThreadListCwdFilter" + }, + { + "type": "null" + } + ], + "description": "Optional cwd filter or filters; when set, only threads whose session cwd exactly matches one of these paths are returned." }, "limit": { "description": "Optional page size; defaults to a reasonable server-side value.", @@ -12843,6 +12860,10 @@ "array", "null" ] + }, + "useStateDbOnly": { + "description": "If true, return from the state DB without scanning JSONL rollouts to repair thread metadata. Omitted or false preserves scan-and-repair behavior.", + "type": "boolean" } }, "title": "ThreadListParams", diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadListParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadListParams.json index 62a7dc9c5..789d9b61f 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadListParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadListParams.json @@ -8,6 +8,19 @@ ], "type": "string" }, + "ThreadListCwdFilter": { + "anyOf": [ + { + "type": "string" + }, + { + "items": { + "type": "string" + }, + "type": "array" + } + ] + }, "ThreadSortKey": { "enum": [ "created_at", @@ -47,11 +60,15 @@ ] }, "cwd": { - "description": "Optional cwd filter; when set, only threads whose session cwd exactly matches this path are returned.", - "type": [ - "string", - "null" - ] + "anyOf": [ + { + "$ref": "#/definitions/ThreadListCwdFilter" + }, + { + "type": "null" + } + ], + "description": "Optional cwd filter or filters; when set, only threads whose session cwd exactly matches one of these paths are returned." }, "limit": { "description": "Optional page size; defaults to a reasonable server-side value.", @@ -110,6 +127,10 @@ "array", "null" ] + }, + "useStateDbOnly": { + "description": "If true, return from the state DB without scanning JSONL rollouts to repair thread metadata. Omitted or false preserves scan-and-repair behavior.", + "type": "boolean" } }, "title": "ThreadListParams", diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadListParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadListParams.ts index a2add1768..ce5b6a79b 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadListParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadListParams.ts @@ -38,10 +38,16 @@ sourceKinds?: Array | null, */ archived?: boolean | null, /** - * Optional cwd filter; when set, only threads whose session cwd exactly - * matches this path are returned. + * Optional cwd filter or filters; when set, only threads whose session cwd + * exactly matches one of these paths are returned. */ -cwd?: string | null, +cwd?: string | Array | null, +/** + * If true, return from the state DB without scanning JSONL rollouts to + * repair thread metadata. Omitted or false preserves scan-and-repair + * behavior. + */ +useStateDbOnly?: boolean, /** * Optional substring filter for the extracted thread title. */ diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 1720b5005..eabb32ecf 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3703,15 +3703,27 @@ pub struct ThreadListParams { /// If false or null, only non-archived threads are returned. #[ts(optional = nullable)] pub archived: Option, - /// Optional cwd filter; when set, only threads whose session cwd exactly - /// matches this path are returned. - #[ts(optional = nullable)] - pub cwd: Option, + /// Optional cwd filter or filters; when set, only threads whose session cwd + /// exactly matches one of these paths are returned. + #[ts(optional = nullable, type = "string | Array | null")] + pub cwd: Option, + /// If true, return from the state DB without scanning JSONL rollouts to + /// repair thread metadata. Omitted or false preserves scan-and-repair + /// behavior. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub use_state_db_only: bool, /// Optional substring filter for the extracted thread title. #[ts(optional = nullable)] pub search_term: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)] +#[serde(untagged)] +pub enum ThreadListCwdFilter { + One(String), + Many(Vec), +} + #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(rename_all = "camelCase", export_to = "v2/")] @@ -7294,6 +7306,46 @@ mod tests { absolute_path("readable") } + #[test] + fn thread_list_params_accepts_single_cwd() { + let params = serde_json::from_value::(json!({ + "cwd": "/workspace", + })) + .expect("single cwd should deserialize"); + + assert_eq!( + params.cwd, + Some(ThreadListCwdFilter::One("/workspace".to_string())) + ); + assert!(!params.use_state_db_only); + } + + #[test] + fn thread_list_params_accepts_multiple_cwds() { + let params = serde_json::from_value::(json!({ + "cwd": ["/workspace", "/other-workspace"], + })) + .expect("cwd array should deserialize"); + + assert_eq!( + params.cwd, + Some(ThreadListCwdFilter::Many(vec![ + "/workspace".to_string(), + "/other-workspace".to_string(), + ])) + ); + } + + #[test] + fn thread_list_params_accepts_state_db_only_flag() { + let params = serde_json::from_value::(json!({ + "useStateDbOnly": true, + })) + .expect("state db only flag should deserialize"); + + assert!(params.use_state_db_only); + } + #[test] fn collab_agent_state_maps_interrupted_status() { assert_eq!( diff --git a/codex-rs/app-server-test-client/src/lib.rs b/codex-rs/app-server-test-client/src/lib.rs index 6f6e0dfc8..cf28cb151 100644 --- a/codex-rs/app-server-test-client/src/lib.rs +++ b/codex-rs/app-server-test-client/src/lib.rs @@ -1129,6 +1129,7 @@ async fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u3 source_kinds: None, archived: None, cwd: None, + use_state_db_only: false, search_term: None, })?; println!("< thread/list response: {response:?}"); diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 91ada0ab8..2452cba36 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -290,7 +290,8 @@ Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `per - `modelProviders` — restrict results to specific providers; unset, null, or an empty array will include all providers. - `sourceKinds` — restrict results to specific sources; omit or pass `[]` for interactive sessions only (`cli`, `vscode`). - `archived` — when `true`, list archived threads only. When `false` or `null`, list non-archived threads (default). -- `cwd` — restrict results to threads whose session cwd exactly matches this path. Relative paths are resolved against the app-server process cwd before matching. +- `cwd` — restrict results to threads whose session cwd exactly matches this path, or one of these paths when an array is provided. Relative paths are resolved against the app-server process cwd before matching. +- `useStateDbOnly` — when `true`, return from the state DB without scanning JSONL rollouts to repair metadata. Omit or pass `false` to preserve the default scan-and-repair behavior. - `searchTerm` — restrict results to threads whose extracted title contains this substring (case-sensitive). - Responses include `nextCursor` to continue in the same direction and `backwardsCursor` to pass as `cursor` when reversing `sortDirection`. - Responses include `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available. @@ -301,6 +302,7 @@ Example: { "method": "thread/list", "id": 20, "params": { "cursor": null, "limit": 25, + "cwd": ["/Users/me/project", "/Users/me/project-worktree"], "sortKey": "created_at" } } { "id": 20, "result": { diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index d6efa1050..c8b9200e3 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -150,6 +150,7 @@ use codex_app_server_protocol::ThreadIncrementElicitationResponse; use codex_app_server_protocol::ThreadInjectItemsParams; use codex_app_server_protocol::ThreadInjectItemsResponse; use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::ThreadListCwdFilter; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; use codex_app_server_protocol::ThreadLoadedListParams; @@ -402,8 +403,9 @@ struct ThreadListFilters { model_providers: Option>, source_kinds: Option>, archived: bool, - cwd: Option, + cwd_filters: Option>, search_term: Option, + use_state_db_only: bool, } // Duration before a browser ChatGPT login attempt is abandoned. @@ -3722,10 +3724,11 @@ impl CodexMessageProcessor { source_kinds, archived, cwd, + use_state_db_only, search_term, } = params; - let cwd = match normalize_thread_list_cwd_filter(cwd) { - Ok(cwd) => cwd, + let cwd_filters = match normalize_thread_list_cwd_filters(cwd) { + Ok(cwd_filters) => cwd_filters, Err(error) => { self.outgoing.send_error(request_id, error).await; return; @@ -3751,8 +3754,9 @@ impl CodexMessageProcessor { model_providers, source_kinds, archived: archived.unwrap_or(false), - cwd, + cwd_filters, search_term, + use_state_db_only, }, ) .await; @@ -5217,8 +5221,9 @@ impl CodexMessageProcessor { model_providers, source_kinds, archived, - cwd, + cwd_filters, search_term, + use_state_db_only, } = filters; let mut cursor_obj = cursor; let mut last_cursor = cursor_obj.clone(); @@ -5255,28 +5260,27 @@ impl CodexMessageProcessor { sort_direction: store_sort_direction, allowed_sources: allowed_sources.to_vec(), model_providers: model_provider_filter.clone(), + cwd_filters: cwd_filters.clone(), archived, search_term: search_term.clone(), + use_state_db_only, }) .await .map_err(thread_store_list_error)?; - let mut candidate_summaries = Vec::with_capacity(page.items.len()); + let mut filtered = Vec::with_capacity(page.items.len()); for it in page.items { let Some(summary) = summary_from_stored_thread(it, fallback_provider.as_str()) else { continue; }; - candidate_summaries.push(summary); - } - - let mut filtered = Vec::with_capacity(candidate_summaries.len()); - for summary in candidate_summaries { if source_kind_filter .as_ref() .is_none_or(|filter| source_kind_matches(&summary.source, filter)) - && cwd.as_ref().is_none_or(|expected_cwd| { - path_utils::paths_match_after_normalization(&summary.cwd, expected_cwd) + && cwd_filters.as_ref().is_none_or(|expected_cwds| { + expected_cwds.iter().any(|expected_cwd| { + path_utils::paths_match_after_normalization(&summary.cwd, expected_cwd) + }) }) { filtered.push(summary); @@ -5288,25 +5292,22 @@ impl CodexMessageProcessor { items.extend(filtered); remaining = requested_page_size.saturating_sub(items.len()); - let next_cursor_value = page.next_cursor.clone(); - next_cursor = next_cursor_value.clone(); + next_cursor = page.next_cursor; if remaining == 0 { break; } - match next_cursor_value { - Some(cursor_val) if remaining > 0 => { - // Break if our pagination would reuse the same cursor again; this avoids - // an infinite loop when filtering drops everything on the page. - if last_cursor.as_ref() == Some(&cursor_val) { - next_cursor = None; - break; - } - last_cursor = Some(cursor_val.clone()); - cursor_obj = Some(cursor_val); - } - _ => break, + let Some(cursor_val) = next_cursor.clone() else { + break; + }; + // Break if our pagination would reuse the same cursor again; this avoids + // an infinite loop when filtering drops everything on the page. + if last_cursor.as_ref() == Some(&cursor_val) { + next_cursor = None; + break; } + last_cursor = Some(cursor_val.clone()); + cursor_obj = Some(cursor_val); } Ok((items, next_cursor)) @@ -8273,25 +8274,36 @@ impl CodexMessageProcessor { } } -fn normalize_thread_list_cwd_filter( - cwd: Option, -) -> Result, JSONRPCErrorError> { +fn normalize_thread_list_cwd_filters( + cwd: Option, +) -> Result>, JSONRPCErrorError> { let Some(cwd) = cwd else { return Ok(None); }; - AbsolutePathBuf::relative_to_current_dir(cwd.as_str()) - .map(AbsolutePathBuf::into_path_buf) - .map(Some) - .map_err(|err| JSONRPCErrorError { - code: INVALID_PARAMS_ERROR_CODE, - message: format!("invalid thread/list cwd filter `{cwd}`: {err}"), - data: None, - }) + + let cwds = match cwd { + ThreadListCwdFilter::One(cwd) => vec![cwd], + ThreadListCwdFilter::Many(cwds) => cwds, + }; + let mut normalized_cwds = Vec::with_capacity(cwds.len()); + for cwd in cwds { + let cwd = AbsolutePathBuf::relative_to_current_dir(cwd.as_str()) + .map(AbsolutePathBuf::into_path_buf) + .map_err(|err| JSONRPCErrorError { + code: INVALID_PARAMS_ERROR_CODE, + message: format!("invalid thread/list cwd filter `{cwd}`: {err}"), + data: None, + })?; + normalized_cwds.push(cwd); + } + + Ok(Some(normalized_cwds)) } #[cfg(test)] mod thread_list_cwd_filter_tests { - use super::normalize_thread_list_cwd_filter; + use super::normalize_thread_list_cwd_filters; + use codex_app_server_protocol::ThreadListCwdFilter; use codex_utils_absolute_path::AbsolutePathBuf; use pretty_assertions::assert_eq; use std::path::PathBuf; @@ -8305,8 +8317,9 @@ mod thread_list_cwd_filter_tests { }; assert_eq!( - normalize_thread_list_cwd_filter(Some(cwd.clone())).expect("cwd filter should parse"), - Some(PathBuf::from(cwd)) + normalize_thread_list_cwd_filters(Some(ThreadListCwdFilter::One(cwd.clone()))) + .expect("cwd filter should parse"), + Some(vec![PathBuf::from(cwd)]) ); } @@ -8316,9 +8329,11 @@ mod thread_list_cwd_filter_tests { let expected = AbsolutePathBuf::relative_to_current_dir("repo-b")?.to_path_buf(); assert_eq!( - normalize_thread_list_cwd_filter(Some(String::from("repo-b"))) - .expect("cwd filter should parse"), - Some(expected) + normalize_thread_list_cwd_filters(Some(ThreadListCwdFilter::Many(vec![String::from( + "repo-b" + ),]))) + .expect("cwd filter should parse"), + Some(vec![expected]) ); Ok(()) } diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index 89980c7c6..0425ce3e3 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -548,6 +548,7 @@ async fn thread_fork_ephemeral_remains_pathless_and_omits_listing() -> Result<() source_kinds: None, archived: None, cwd: None, + use_state_db_only: false, search_term: None, }) .await?; diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 247bbb412..615692d70 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -15,6 +15,7 @@ use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SessionSource; use codex_app_server_protocol::SortDirection; +use codex_app_server_protocol::ThreadListCwdFilter; use codex_app_server_protocol::ThreadListResponse; use codex_app_server_protocol::ThreadSortKey; use codex_app_server_protocol::ThreadSourceKind; @@ -90,6 +91,7 @@ async fn list_threads_with_sort( source_kinds, archived, cwd: None, + use_state_db_only: false, search_term: None, }) .await?; @@ -468,15 +470,23 @@ async fn thread_list_respects_provider_filter() -> Result<()> { } #[tokio::test] -async fn thread_list_respects_cwd_filter() -> Result<()> { +async fn thread_list_respects_cwd_filters() -> Result<()> { let codex_home = TempDir::new()?; create_minimal_config(codex_home.path())?; - let filtered_id = create_fake_rollout( + let first_filtered_id = create_fake_rollout( codex_home.path(), "2025-01-02T10-00-00", "2025-01-02T10:00:00Z", - "filtered", + "first filtered", + Some("mock_provider"), + /*git_info*/ None, + )?; + let second_filtered_id = create_fake_rollout( + codex_home.path(), + "2025-01-02T12-00-00", + "2025-01-02T12:00:00Z", + "second filtered", Some("mock_provider"), /*git_info*/ None, )?; @@ -489,11 +499,22 @@ async fn thread_list_respects_cwd_filter() -> Result<()> { /*git_info*/ None, )?; - let target_cwd = codex_home.path().join("target-cwd"); - fs::create_dir_all(&target_cwd)?; + let first_target_cwd = codex_home.path().join("first-target-cwd"); + let second_target_cwd = codex_home.path().join("second-target-cwd"); + fs::create_dir_all(&first_target_cwd)?; + fs::create_dir_all(&second_target_cwd)?; set_rollout_cwd( - rollout_path(codex_home.path(), "2025-01-02T10-00-00", &filtered_id).as_path(), - &target_cwd, + rollout_path(codex_home.path(), "2025-01-02T10-00-00", &first_filtered_id).as_path(), + &first_target_cwd, + )?; + set_rollout_cwd( + rollout_path( + codex_home.path(), + "2025-01-02T12-00-00", + &second_filtered_id, + ) + .as_path(), + &second_target_cwd, )?; let mut mcp = init_mcp(codex_home.path()).await?; @@ -506,7 +527,11 @@ async fn thread_list_respects_cwd_filter() -> Result<()> { model_providers: Some(vec!["mock_provider".to_string()]), source_kinds: None, archived: None, - cwd: Some(target_cwd.to_string_lossy().into_owned()), + cwd: Some(ThreadListCwdFilter::Many(vec![ + first_target_cwd.to_string_lossy().into_owned(), + second_target_cwd.to_string_lossy().into_owned(), + ])), + use_state_db_only: false, search_term: None, }) .await?; @@ -520,10 +545,14 @@ async fn thread_list_respects_cwd_filter() -> Result<()> { } = to_response::(resp)?; assert_eq!(next_cursor, None); - assert_eq!(data.len(), 1); - assert_eq!(data[0].id, filtered_id); - assert_ne!(data[0].id, unfiltered_id); - assert_eq!(data[0].cwd.as_path(), target_cwd.as_path()); + let filtered_ids: Vec<_> = data.iter().map(|thread| thread.id.as_str()).collect(); + assert_eq!( + filtered_ids, + vec![second_filtered_id.as_str(), first_filtered_id.as_str()] + ); + assert!(!filtered_ids.contains(&unfiltered_id.as_str())); + assert_eq!(data[0].cwd.as_path(), second_target_cwd.as_path()); + assert_eq!(data[1].cwd.as_path(), first_target_cwd.as_path()); Ok(()) } @@ -592,6 +621,7 @@ sqlite = true codex_core::SortDirection::Desc, &[], /*model_providers*/ None, + /*cwd_filters*/ None, "mock_provider", /*search_term*/ None, ) @@ -609,6 +639,7 @@ sqlite = true source_kinds: None, archived: None, cwd: None, + use_state_db_only: false, search_term: Some("needle".to_string()), }) .await?; @@ -628,6 +659,129 @@ sqlite = true Ok(()) } +#[tokio::test] +async fn thread_list_state_db_only_returns_sqlite_without_jsonl_repair() -> Result<()> { + let codex_home = TempDir::new()?; + std::fs::write( + codex_home.path().join("config.toml"), + r#" +model = "mock-model" +approval_policy = "never" +suppress_unstable_features_warning = true + +[features] +sqlite = true +"#, + )?; + + let thread_id = create_fake_rollout( + codex_home.path(), + "2025-01-02T10-00-00", + "2025-01-02T10:00:00Z", + "state db only should not see this before repair", + Some("mock_provider"), + /*git_info*/ None, + )?; + let state_db = + codex_state::StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()) + .await?; + state_db + .mark_backfill_complete(/*last_watermark*/ None) + .await?; + let mut mcp = init_mcp(codex_home.path()).await?; + + let request_id = mcp + .send_thread_list_request(codex_app_server_protocol::ThreadListParams { + cursor: None, + limit: Some(10), + sort_key: None, + sort_direction: None, + model_providers: Some(vec!["mock_provider".to_string()]), + source_kinds: None, + archived: None, + cwd: None, + use_state_db_only: false, + search_term: None, + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let repaired_response = to_response::(resp)?; + let ids: Vec<_> = repaired_response + .data + .iter() + .map(|thread| thread.id.as_str()) + .collect(); + assert_eq!(ids, vec![thread_id.as_str()]); + + let thread_uuid = ThreadId::from_string(&thread_id)?; + let stale_cwd = codex_home.path().join("stale-cwd"); + let mut metadata = state_db + .get_thread(thread_uuid) + .await? + .expect("thread should be repaired into sqlite"); + metadata.cwd = stale_cwd.clone(); + state_db.upsert_thread(&metadata).await?; + + let request_id = mcp + .send_thread_list_request(codex_app_server_protocol::ThreadListParams { + cursor: None, + limit: Some(10), + sort_key: None, + sort_direction: None, + model_providers: Some(vec!["mock_provider".to_string()]), + source_kinds: None, + archived: None, + cwd: Some(ThreadListCwdFilter::One( + stale_cwd.to_string_lossy().into_owned(), + )), + use_state_db_only: true, + search_term: None, + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let state_db_only_response = to_response::(resp)?; + let ids: Vec<_> = state_db_only_response + .data + .iter() + .map(|thread| thread.id.as_str()) + .collect(); + assert_eq!(ids, vec![thread_id.as_str()]); + + let request_id = mcp + .send_thread_list_request(codex_app_server_protocol::ThreadListParams { + cursor: None, + limit: Some(10), + sort_key: None, + sort_direction: None, + model_providers: Some(vec!["mock_provider".to_string()]), + source_kinds: None, + archived: None, + cwd: Some(ThreadListCwdFilter::One( + stale_cwd.to_string_lossy().into_owned(), + )), + use_state_db_only: false, + search_term: None, + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let scanned_response = to_response::(resp)?; + assert_eq!(scanned_response.data.len(), 0); + + Ok(()) +} + #[tokio::test] async fn thread_list_empty_source_kinds_defaults_to_interactive_only() -> Result<()> { let codex_home = TempDir::new()?; @@ -1307,6 +1461,7 @@ async fn thread_list_backwards_cursor_can_seed_forward_delta_sync() -> Result<() source_kinds: None, archived: None, cwd: None, + use_state_db_only: false, search_term: None, }) .await?; @@ -1348,6 +1503,7 @@ async fn thread_list_backwards_cursor_can_seed_forward_delta_sync() -> Result<() source_kinds: None, archived: None, cwd: None, + use_state_db_only: false, search_term: None, }) .await?; @@ -1585,6 +1741,7 @@ async fn thread_list_invalid_cursor_returns_error() -> Result<()> { source_kinds: None, archived: None, cwd: None, + use_state_db_only: false, search_term: None, }) .await?; diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 7cee2f099..8e0e253ac 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -548,6 +548,7 @@ async fn thread_name_set_is_reflected_in_read_list_and_resume() -> Result<()> { source_kinds: None, archived: None, cwd: None, + use_state_db_only: false, search_term: None, }) .await?; diff --git a/codex-rs/core/src/personality_migration.rs b/codex-rs/core/src/personality_migration.rs index ae24499e0..19bf00284 100644 --- a/codex-rs/core/src/personality_migration.rs +++ b/codex-rs/core/src/personality_migration.rs @@ -82,8 +82,10 @@ async fn has_threads(store: &LocalThreadStore, archived: bool) -> io::Result Vec { sort_direction: SortDirection::Desc, allowed_sources: Vec::new(), model_providers: None, + cwd_filters: None, archived: false, search_term: None, + use_state_db_only: false, }) .await { diff --git a/codex-rs/debug-client/src/client.rs b/codex-rs/debug-client/src/client.rs index c8cf0dd72..2edabfac0 100644 --- a/codex-rs/debug-client/src/client.rs +++ b/codex-rs/debug-client/src/client.rs @@ -183,6 +183,7 @@ impl AppServerClient { source_kinds: None, archived: None, cwd: None, + use_state_db_only: false, search_term: None, }, }; diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 11bf1b7a6..ca3b65077 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -1256,6 +1256,7 @@ async fn resolve_resume_thread_id( source_kinds: Some(all_thread_source_kinds()), archived: Some(false), cwd: None, + use_state_db_only: false, search_term: None, }, }, @@ -1319,6 +1320,7 @@ async fn resolve_resume_thread_id( source_kinds: Some(all_thread_source_kinds()), archived: Some(false), cwd: None, + use_state_db_only: false, search_term: Some(session_id.to_string()), }, }, diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index bf09d15e8..bdb719883 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -1,6 +1,7 @@ #![allow(warnings, clippy::all)] use async_trait::async_trait; +use codex_utils_path as path_utils; use std::cmp::Reverse; use std::ffi::OsStr; use std::io; @@ -126,6 +127,7 @@ pub enum ThreadListLayout { pub struct ThreadListConfig<'a> { pub allowed_sources: &'a [SessionSource], pub model_providers: Option<&'a [String]>, + pub cwd_filters: Option<&'a [PathBuf]>, pub default_provider: &'a str, pub layout: ThreadListLayout, } @@ -207,6 +209,7 @@ struct FilesByCreatedAtVisitor<'a> { more_matches_available: bool, allowed_sources: &'a [SessionSource], provider_matcher: Option<&'a ProviderMatcher<'a>>, + cwd_filters: Option<&'a [PathBuf]>, } #[async_trait] @@ -237,6 +240,7 @@ impl<'a> RolloutFileVisitor for FilesByCreatedAtVisitor<'a> { path, self.allowed_sources, self.provider_matcher, + self.cwd_filters, updated_at, ) .await @@ -317,6 +321,7 @@ pub async fn get_threads( sort_key: ThreadSortKey, allowed_sources: &[SessionSource], model_providers: Option<&[String]>, + cwd_filters: Option<&[PathBuf]>, default_provider: &str, ) -> io::Result { let root = codex_home.join(SESSIONS_SUBDIR); @@ -328,6 +333,7 @@ pub async fn get_threads( ThreadListConfig { allowed_sources, model_providers, + cwd_filters, default_provider, layout: ThreadListLayout::NestedByDate, }, @@ -366,6 +372,7 @@ pub async fn get_threads_in_root( sort_key, config.allowed_sources, provider_matcher.as_ref(), + config.cwd_filters, ) .await? } @@ -377,6 +384,7 @@ pub async fn get_threads_in_root( sort_key, config.allowed_sources, provider_matcher.as_ref(), + config.cwd_filters, ) .await? } @@ -395,6 +403,7 @@ async fn traverse_directories_for_paths( sort_key: ThreadSortKey, allowed_sources: &[SessionSource], provider_matcher: Option<&ProviderMatcher<'_>>, + cwd_filters: Option<&[PathBuf]>, ) -> io::Result { match sort_key { ThreadSortKey::CreatedAt => { @@ -404,6 +413,7 @@ async fn traverse_directories_for_paths( anchor, allowed_sources, provider_matcher, + cwd_filters, ) .await } @@ -414,6 +424,7 @@ async fn traverse_directories_for_paths( anchor, allowed_sources, provider_matcher, + cwd_filters, ) .await } @@ -427,15 +438,30 @@ async fn traverse_flat_paths( sort_key: ThreadSortKey, allowed_sources: &[SessionSource], provider_matcher: Option<&ProviderMatcher<'_>>, + cwd_filters: Option<&[PathBuf]>, ) -> io::Result { match sort_key { ThreadSortKey::CreatedAt => { - traverse_flat_paths_created(root, page_size, anchor, allowed_sources, provider_matcher) - .await + traverse_flat_paths_created( + root, + page_size, + anchor, + allowed_sources, + provider_matcher, + cwd_filters, + ) + .await } ThreadSortKey::UpdatedAt => { - traverse_flat_paths_updated(root, page_size, anchor, allowed_sources, provider_matcher) - .await + traverse_flat_paths_updated( + root, + page_size, + anchor, + allowed_sources, + provider_matcher, + cwd_filters, + ) + .await } } } @@ -452,6 +478,7 @@ async fn traverse_directories_for_paths_created( anchor: Option, allowed_sources: &[SessionSource], provider_matcher: Option<&ProviderMatcher<'_>>, + cwd_filters: Option<&[PathBuf]>, ) -> io::Result { let mut items: Vec = Vec::with_capacity(page_size); let mut scanned_files = 0usize; @@ -463,6 +490,7 @@ async fn traverse_directories_for_paths_created( more_matches_available, allowed_sources, provider_matcher, + cwd_filters, }; walk_rollout_files(&root, &mut scanned_files, &mut visitor).await?; more_matches_available = visitor.more_matches_available; @@ -499,14 +527,14 @@ async fn traverse_directories_for_paths_updated( anchor: Option, allowed_sources: &[SessionSource], provider_matcher: Option<&ProviderMatcher<'_>>, + cwd_filters: Option<&[PathBuf]>, ) -> io::Result { let mut items: Vec = Vec::with_capacity(page_size); let mut scanned_files = 0usize; let mut anchor_state = AnchorState::new(anchor); let mut more_matches_available = false; - let candidates = collect_files_by_updated_at(&root, &mut scanned_files).await?; - let mut candidates = candidates; + let mut candidates = collect_files_by_updated_at(&root, &mut scanned_files).await?; candidates.sort_by_key(|candidate| { let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH); (Reverse(ts), Reverse(candidate.id)) @@ -527,6 +555,7 @@ async fn traverse_directories_for_paths_updated( candidate.path, allowed_sources, provider_matcher, + cwd_filters, updated_at_fallback, ) .await @@ -559,6 +588,7 @@ async fn traverse_flat_paths_created( anchor: Option, allowed_sources: &[SessionSource], provider_matcher: Option<&ProviderMatcher<'_>>, + cwd_filters: Option<&[PathBuf]>, ) -> io::Result { let mut items: Vec = Vec::with_capacity(page_size); let mut scanned_files = 0usize; @@ -578,8 +608,14 @@ async fn traverse_flat_paths_created( .await .unwrap_or(None) .and_then(format_rfc3339); - if let Some(item) = - build_thread_item(path, allowed_sources, provider_matcher, updated_at).await + if let Some(item) = build_thread_item( + path, + allowed_sources, + provider_matcher, + cwd_filters, + updated_at, + ) + .await { items.push(item); } @@ -609,14 +645,14 @@ async fn traverse_flat_paths_updated( anchor: Option, allowed_sources: &[SessionSource], provider_matcher: Option<&ProviderMatcher<'_>>, + cwd_filters: Option<&[PathBuf]>, ) -> io::Result { let mut items: Vec = Vec::with_capacity(page_size); let mut scanned_files = 0usize; let mut anchor_state = AnchorState::new(anchor); let mut more_matches_available = false; - let candidates = collect_flat_files_by_updated_at(&root, &mut scanned_files).await?; - let mut candidates = candidates; + let mut candidates = collect_flat_files_by_updated_at(&root, &mut scanned_files).await?; candidates.sort_by_key(|candidate| { let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH); (Reverse(ts), Reverse(candidate.id)) @@ -637,6 +673,7 @@ async fn traverse_flat_paths_updated( candidate.path, allowed_sources, provider_matcher, + cwd_filters, updated_at_fallback, ) .await @@ -698,6 +735,7 @@ async fn build_thread_item( path: PathBuf, allowed_sources: &[SessionSource], provider_matcher: Option<&ProviderMatcher<'_>>, + cwd_filters: Option<&[PathBuf]>, updated_at: Option, ) -> Option { // Read head and detect message events; stop once meta + user are found. @@ -717,6 +755,15 @@ async fn build_thread_item( { return None; } + if let Some(cwd_filters) = cwd_filters + && !summary.cwd.as_ref().is_some_and(|cwd| { + cwd_filters + .iter() + .any(|filter| path_utils::paths_match_after_normalization(cwd, filter)) + }) + { + return None; + } // Apply filters: must have session meta and at least one user message event if summary.saw_session_meta && summary.saw_user_event { let HeadTailSummary { @@ -768,6 +815,7 @@ pub async fn read_thread_item_from_rollout(path: PathBuf) -> Option path, &[], /*provider_matcher*/ None, + /*cwd_filters*/ None, /*updated_at*/ None, ) .await diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 871d53141..184cfc191 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -214,6 +214,18 @@ fn sanitize_rollout_item_for_persistence( } } +#[derive(Clone, Copy)] +enum ThreadListArchiveFilter { + Active, + Archived, +} + +#[derive(Clone, Copy)] +enum ThreadListRepairMode { + ScanAndRepair, + StateDbOnly, +} + impl RolloutRecorder { /// List threads (rollout files) under the provided Codex home directory. #[allow(clippy::too_many_arguments)] @@ -225,6 +237,7 @@ impl RolloutRecorder { sort_direction: SortDirection, allowed_sources: &[SessionSource], model_providers: Option<&[String]>, + cwd_filters: Option<&[PathBuf]>, default_provider: &str, search_term: Option<&str>, ) -> std::io::Result { @@ -236,8 +249,40 @@ impl RolloutRecorder { sort_direction, allowed_sources, model_providers, + cwd_filters, default_provider, - /*archived*/ false, + ThreadListArchiveFilter::Active, + ThreadListRepairMode::ScanAndRepair, + search_term, + ) + .await + } + + #[allow(clippy::too_many_arguments)] + pub async fn list_threads_from_state_db( + config: &impl RolloutConfigView, + page_size: usize, + cursor: Option<&Cursor>, + sort_key: ThreadSortKey, + sort_direction: SortDirection, + allowed_sources: &[SessionSource], + model_providers: Option<&[String]>, + cwd_filters: Option<&[PathBuf]>, + default_provider: &str, + search_term: Option<&str>, + ) -> std::io::Result { + Self::list_threads_with_db_fallback( + config, + page_size, + cursor, + sort_key, + sort_direction, + allowed_sources, + model_providers, + cwd_filters, + default_provider, + ThreadListArchiveFilter::Active, + ThreadListRepairMode::StateDbOnly, search_term, ) .await @@ -253,6 +298,7 @@ impl RolloutRecorder { sort_direction: SortDirection, allowed_sources: &[SessionSource], model_providers: Option<&[String]>, + cwd_filters: Option<&[PathBuf]>, default_provider: &str, search_term: Option<&str>, ) -> std::io::Result { @@ -264,8 +310,40 @@ impl RolloutRecorder { sort_direction, allowed_sources, model_providers, + cwd_filters, default_provider, - /*archived*/ true, + ThreadListArchiveFilter::Archived, + ThreadListRepairMode::ScanAndRepair, + search_term, + ) + .await + } + + #[allow(clippy::too_many_arguments)] + pub async fn list_archived_threads_from_state_db( + config: &impl RolloutConfigView, + page_size: usize, + cursor: Option<&Cursor>, + sort_key: ThreadSortKey, + sort_direction: SortDirection, + allowed_sources: &[SessionSource], + model_providers: Option<&[String]>, + cwd_filters: Option<&[PathBuf]>, + default_provider: &str, + search_term: Option<&str>, + ) -> std::io::Result { + Self::list_threads_with_db_fallback( + config, + page_size, + cursor, + sort_key, + sort_direction, + allowed_sources, + model_providers, + cwd_filters, + default_provider, + ThreadListArchiveFilter::Archived, + ThreadListRepairMode::StateDbOnly, search_term, ) .await @@ -280,19 +358,24 @@ impl RolloutRecorder { sort_direction: SortDirection, allowed_sources: &[SessionSource], model_providers: Option<&[String]>, + cwd_filters: Option<&[PathBuf]>, default_provider: &str, - archived: bool, + archive_filter: ThreadListArchiveFilter, + repair_mode: ThreadListRepairMode, search_term: Option<&str>, ) -> std::io::Result { let codex_home = config.codex_home(); let state_db_ctx = state_db::get_state_db(config).await; + let archived = match archive_filter { + ThreadListArchiveFilter::Active => false, + ThreadListArchiveFilter::Archived => true, + }; + if cwd_filters.is_some_and(<[std::path::PathBuf]>::is_empty) { + return Ok(ThreadsPage::default()); + } - // Search is the SQLite-optimized path and assumes a DB marked backfill-complete is - // actually populated enough to answer the query. If unmigrated rollout files still exist - // on disk, the repair path below may or may not run and catch them depending on whether - // SQLite already has another matching search hit. - if search_term.is_some() - && let Some(db_page) = state_db::list_threads_db( + if matches!(repair_mode, ThreadListRepairMode::StateDbOnly) { + return Ok(state_db::list_threads_db( state_db_ctx.as_deref(), codex_home, page_size, @@ -301,18 +384,22 @@ impl RolloutRecorder { sort_direction, allowed_sources, model_providers, + cwd_filters, archived, search_term, ) .await - && (!db_page.items.is_empty() || cursor.is_some()) - { - return Ok(db_page.into()); + .map(Into::into) + .unwrap_or_default()); } + let listing_has_metadata_filters = !allowed_sources.is_empty() + || model_providers.is_some() + || cwd_filters.is_some() + || search_term.is_some(); // Filesystem-first listing intentionally overfetches so we can repair stale/missing - // SQLite rollout paths before the final DB-backed page is returned. - let fs_page_size = page_size.saturating_mul(2).max(page_size); + // SQLite rows before returning the scan page for filtered listings or the DB page for + // unfiltered listings. let fs_page = match sort_direction { SortDirection::Asc => { list_threads_from_files_asc( @@ -322,6 +409,7 @@ impl RolloutRecorder { sort_key, allowed_sources, model_providers, + cwd_filters, default_provider, archived, search_term, @@ -331,11 +419,12 @@ impl RolloutRecorder { SortDirection::Desc => { list_threads_from_files_desc( codex_home, - fs_page_size, + page_size.saturating_mul(2), cursor, sort_key, allowed_sources, model_providers, + cwd_filters, default_provider, archived, search_term, @@ -347,24 +436,39 @@ impl RolloutRecorder { if state_db_ctx.is_none() { // Keep legacy behavior when SQLite is unavailable: return filesystem results // at the requested page size. - return Ok(match sort_direction { - SortDirection::Asc => fs_page, - SortDirection::Desc => truncate_fs_page(fs_page, page_size, sort_key), - }); + return Ok(page_from_filesystem_scan( + fs_page, + sort_direction, + page_size, + sort_key, + )); } // Warm the DB by repairing every filesystem hit before querying SQLite. for item in &fs_page.items { - state_db::read_repair_rollout_path( - state_db_ctx.as_deref(), - item.thread_id, - Some(archived), - item.path.as_path(), - ) - .await; + if listing_has_metadata_filters { + state_db::reconcile_rollout( + state_db_ctx.as_deref(), + item.path.as_path(), + default_provider, + /*builder*/ None, + &[], + Some(archived), + /*new_thread_memory_mode*/ None, + ) + .await; + } else { + state_db::read_repair_rollout_path( + state_db_ctx.as_deref(), + item.thread_id, + Some(archived), + item.path.as_path(), + ) + .await; + } } - if let Some(db_page) = state_db::list_threads_db( + let db_page = state_db::list_threads_db( state_db_ctx.as_deref(), codex_home, page_size, @@ -373,20 +477,83 @@ impl RolloutRecorder { sort_direction, allowed_sources, model_providers, + cwd_filters, archived, search_term, ) - .await - { + .await; + if let Some(db_page) = db_page { + if search_term.is_some() && (!db_page.items.is_empty() || cursor.is_some()) { + for item in &db_page.items { + state_db::reconcile_rollout( + state_db_ctx.as_deref(), + item.rollout_path.as_path(), + default_provider, + /*builder*/ None, + &[], + Some(archived), + /*new_thread_memory_mode*/ None, + ) + .await; + } + if let Some(repaired_db_page) = state_db::list_threads_db( + state_db_ctx.as_deref(), + codex_home, + page_size, + cursor, + sort_key, + sort_direction, + allowed_sources, + model_providers, + cwd_filters, + archived, + search_term, + ) + .await + { + return Ok(repaired_db_page.into()); + } + return Ok(db_page.into()); + } + if listing_has_metadata_filters { + for item in &db_page.items { + state_db::reconcile_rollout( + state_db_ctx.as_deref(), + item.rollout_path.as_path(), + default_provider, + /*builder*/ None, + &[], + Some(archived), + /*new_thread_memory_mode*/ None, + ) + .await; + } + return Ok(page_from_filesystem_scan( + fs_page, + sort_direction, + page_size, + sort_key, + )); + } return Ok(db_page.into()); } + if listing_has_metadata_filters { + return Ok(page_from_filesystem_scan( + fs_page, + sort_direction, + page_size, + sort_key, + )); + } // If SQLite listing still fails, return the filesystem page rather than failing the list. tracing::error!("Falling back on rollout system"); tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back"); - Ok(match sort_direction { - SortDirection::Asc => fs_page, - SortDirection::Desc => truncate_fs_page(fs_page, page_size, sort_key), - }) + Ok(page_from_filesystem_scan( + fs_page, + sort_direction, + page_size, + sort_key, + )) } /// Find the newest recorded thread path, optionally filtering to a matching cwd. @@ -403,6 +570,7 @@ impl RolloutRecorder { ) -> std::io::Result> { let codex_home = config.codex_home(); let state_db_ctx = state_db::get_state_db(config).await; + let cwd_filter = filter_cwd.map(Path::to_path_buf); if state_db_ctx.is_some() { let mut db_cursor = cursor.cloned(); loop { @@ -415,6 +583,7 @@ impl RolloutRecorder { SortDirection::Desc, allowed_sources, model_providers, + cwd_filter.as_ref().map(std::slice::from_ref), /*archived*/ false, /*search_term*/ None, ) @@ -443,6 +612,7 @@ impl RolloutRecorder { sort_key, allowed_sources, model_providers, + cwd_filter.as_ref().map(std::slice::from_ref), default_provider, ) .await?; @@ -796,6 +966,18 @@ fn truncate_fs_page( page } +fn page_from_filesystem_scan( + page: ThreadsPage, + sort_direction: SortDirection, + page_size: usize, + sort_key: ThreadSortKey, +) -> ThreadsPage { + match sort_direction { + SortDirection::Asc => page, + SortDirection::Desc => truncate_fs_page(page, page_size, sort_key), + } +} + #[allow(clippy::too_many_arguments)] async fn list_threads_from_files_desc( codex_home: &Path, @@ -804,6 +986,7 @@ async fn list_threads_from_files_desc( sort_key: ThreadSortKey, allowed_sources: &[SessionSource], model_providers: Option<&[String]>, + cwd_filters: Option<&[PathBuf]>, default_provider: &str, archived: bool, search_term: Option<&str>, @@ -823,6 +1006,7 @@ async fn list_threads_from_files_desc( sort_key, allowed_sources, model_providers, + cwd_filters, default_provider, archived, ) @@ -864,6 +1048,7 @@ async fn list_threads_from_files_desc( sort_key, allowed_sources, model_providers, + cwd_filters, default_provider, archived, ) @@ -878,6 +1063,7 @@ async fn list_threads_from_files_desc_unfiltered( sort_key: ThreadSortKey, allowed_sources: &[SessionSource], model_providers: Option<&[String]>, + cwd_filters: Option<&[PathBuf]>, default_provider: &str, archived: bool, ) -> std::io::Result { @@ -891,6 +1077,7 @@ async fn list_threads_from_files_desc_unfiltered( ThreadListConfig { allowed_sources, model_providers, + cwd_filters, default_provider, layout: ThreadListLayout::Flat, }, @@ -904,6 +1091,7 @@ async fn list_threads_from_files_desc_unfiltered( sort_key, allowed_sources, model_providers, + cwd_filters, default_provider, ) .await @@ -918,6 +1106,7 @@ async fn list_threads_from_files_asc( sort_key: ThreadSortKey, allowed_sources: &[SessionSource], model_providers: Option<&[String]>, + cwd_filters: Option<&[PathBuf]>, default_provider: &str, archived: bool, search_term: Option<&str>, @@ -935,6 +1124,7 @@ async fn list_threads_from_files_asc( sort_key, allowed_sources, model_providers, + cwd_filters, default_provider, archived, /*search_term*/ None, diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index fa789d3a7..852bf6c6d 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -375,6 +375,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re SortDirection::Desc, &[], /*model_providers*/ None, + /*cwd_filters*/ None, default_provider.as_str(), /*search_term*/ None, ) @@ -391,6 +392,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re SortDirection::Desc, &[], /*model_providers*/ None, + /*cwd_filters*/ None, default_provider.as_str(), /*search_term*/ None, ) @@ -449,6 +451,7 @@ async fn list_threads_db_enabled_drops_missing_rollout_paths() -> std::io::Resul SortDirection::Desc, &[], /*model_providers*/ None, + /*cwd_filters*/ None, default_provider.as_str(), /*search_term*/ None, ) @@ -512,6 +515,7 @@ async fn list_threads_db_enabled_repairs_stale_rollout_paths() -> std::io::Resul SortDirection::Desc, &[], /*model_providers*/ None, + /*cwd_filters*/ None, default_provider.as_str(), /*search_term*/ None, ) @@ -527,6 +531,274 @@ async fn list_threads_db_enabled_repairs_stale_rollout_paths() -> std::io::Resul Ok(()) } +#[tokio::test] +async fn list_threads_state_db_only_skips_jsonl_repair_scan() -> std::io::Result<()> { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + runtime + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); + + let uuid = Uuid::from_u128(9012); + let ts = "2025-01-03T14-00-00"; + let day_dir = home.path().join("sessions/2025/01/03"); + fs::create_dir_all(&day_dir)?; + let path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl")); + let mut file = File::create(&path)?; + let meta = serde_json::json!({ + "timestamp": ts, + "type": "session_meta", + "payload": { + "id": uuid, + "timestamp": ts, + "cwd": home.path().display().to_string(), + "originator": "test_originator", + "cli_version": "test_version", + "source": "cli", + "model_provider": "test-provider", + }, + }); + writeln!(file, "{meta}")?; + let user_event = serde_json::json!({ + "timestamp": ts, + "type": "event_msg", + "payload": { + "type": "user_message", + "message": "Hello from user", + "kind": "plain", + }, + }); + writeln!(file, "{user_event}")?; + + let cwd_filters = [home.path().to_path_buf()]; + let state_db_only_page = RolloutRecorder::list_threads_from_state_db( + &config, + /*page_size*/ 10, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + SortDirection::Desc, + &[], + /*model_providers*/ None, + /*cwd_filters*/ Some(cwd_filters.as_slice()), + config.model_provider_id.as_str(), + /*search_term*/ None, + ) + .await?; + assert_eq!(state_db_only_page.items.len(), 0); + + let repaired_page = RolloutRecorder::list_threads( + &config, + /*page_size*/ 10, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + SortDirection::Desc, + &[], + /*model_providers*/ None, + /*cwd_filters*/ Some(cwd_filters.as_slice()), + config.model_provider_id.as_str(), + /*search_term*/ None, + ) + .await?; + assert_eq!(repaired_page.items.len(), 1); + + let repaired_state_db_only_page = RolloutRecorder::list_threads_from_state_db( + &config, + /*page_size*/ 10, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + SortDirection::Desc, + &[], + /*model_providers*/ None, + /*cwd_filters*/ Some(cwd_filters.as_slice()), + config.model_provider_id.as_str(), + /*search_term*/ None, + ) + .await?; + assert_eq!(repaired_state_db_only_page.items.len(), 1); + Ok(()) +} + +#[tokio::test] +async fn list_threads_default_filter_returns_filesystem_scan_results() -> std::io::Result<()> { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + + let uuid = Uuid::from_u128(9013); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + let real_path = write_session_file(home.path(), "2025-01-03T13-00-00", uuid)?; + let stale_cwd = home.path().join("stale-cwd"); + + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + runtime + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); + let created_at = chrono::Utc + .with_ymd_and_hms(2025, 1, 3, 13, 0, 0) + .single() + .expect("valid datetime"); + let mut builder = codex_state::ThreadMetadataBuilder::new( + thread_id, + real_path, + created_at, + SessionSource::Cli, + ); + builder.model_provider = Some(config.model_provider_id.clone()); + builder.cwd = stale_cwd.clone(); + let mut metadata = builder.build(config.model_provider_id.as_str()); + metadata.first_user_message = Some("Hello from user".to_string()); + runtime + .upsert_thread(&metadata) + .await + .expect("state db upsert should succeed"); + + let cwd_filters = [stale_cwd]; + let state_db_only_page = RolloutRecorder::list_threads_from_state_db( + &config, + /*page_size*/ 10, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + SortDirection::Desc, + &[], + /*model_providers*/ None, + /*cwd_filters*/ Some(cwd_filters.as_slice()), + config.model_provider_id.as_str(), + /*search_term*/ None, + ) + .await?; + assert_eq!(state_db_only_page.items.len(), 1); + + let scanned_page = RolloutRecorder::list_threads( + &config, + /*page_size*/ 10, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + SortDirection::Desc, + &[], + /*model_providers*/ None, + /*cwd_filters*/ Some(cwd_filters.as_slice()), + config.model_provider_id.as_str(), + /*search_term*/ None, + ) + .await?; + assert_eq!(scanned_page.items.len(), 0); + + let repaired_state_db_only_page = RolloutRecorder::list_threads_from_state_db( + &config, + /*page_size*/ 10, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + SortDirection::Desc, + &[], + /*model_providers*/ None, + /*cwd_filters*/ Some(cwd_filters.as_slice()), + config.model_provider_id.as_str(), + /*search_term*/ None, + ) + .await?; + assert_eq!(repaired_state_db_only_page.items.len(), 0); + Ok(()) +} + +#[tokio::test] +async fn list_threads_search_repairs_stale_state_db_hits_before_returning() -> std::io::Result<()> { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + + let uuid = Uuid::from_u128(9014); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + let real_path = write_session_file(home.path(), "2025-01-03T15-00-00", uuid)?; + + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + runtime + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); + let created_at = chrono::Utc + .with_ymd_and_hms(2025, 1, 3, 15, 0, 0) + .single() + .expect("valid datetime"); + let mut builder = codex_state::ThreadMetadataBuilder::new( + thread_id, + real_path, + created_at, + SessionSource::Cli, + ); + builder.model_provider = Some(config.model_provider_id.clone()); + builder.cwd = home.path().to_path_buf(); + let mut metadata = builder.build(config.model_provider_id.as_str()); + metadata.title = "needle stale title".to_string(); + metadata.first_user_message = Some("stale first user".to_string()); + runtime + .upsert_thread(&metadata) + .await + .expect("state db upsert should succeed"); + + let stale_state_db_only_page = RolloutRecorder::list_threads_from_state_db( + &config, + /*page_size*/ 10, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + SortDirection::Desc, + &[], + /*model_providers*/ None, + /*cwd_filters*/ None, + config.model_provider_id.as_str(), + Some("needle"), + ) + .await?; + assert_eq!(stale_state_db_only_page.items.len(), 1); + + let scanned_page = RolloutRecorder::list_threads( + &config, + /*page_size*/ 10, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + SortDirection::Desc, + &[], + /*model_providers*/ None, + /*cwd_filters*/ None, + config.model_provider_id.as_str(), + Some("needle"), + ) + .await?; + assert_eq!(scanned_page.items.len(), 0); + + let repaired_state_db_only_page = RolloutRecorder::list_threads_from_state_db( + &config, + /*page_size*/ 10, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + SortDirection::Desc, + &[], + /*model_providers*/ None, + /*cwd_filters*/ None, + config.model_provider_id.as_str(), + Some("needle"), + ) + .await?; + assert_eq!(repaired_state_db_only_page.items.len(), 0); + Ok(()) +} + #[tokio::test] async fn resume_candidate_matches_cwd_reads_latest_turn_context() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); diff --git a/codex-rs/rollout/src/state_db.rs b/codex-rs/rollout/src/state_db.rs index 7d72429c9..ae87bff73 100644 --- a/codex-rs/rollout/src/state_db.rs +++ b/codex-rs/rollout/src/state_db.rs @@ -191,6 +191,7 @@ pub async fn list_threads_db( sort_direction: SortDirection, allowed_sources: &[SessionSource], model_providers: Option<&[String]>, + cwd_filters: Option<&[PathBuf]>, archived: bool, search_term: Option<&str>, ) -> Option { @@ -213,6 +214,12 @@ pub async fn list_threads_db( }) .collect(); let model_providers = model_providers.map(<[String]>::to_vec); + let normalized_cwd_filters = cwd_filters.map(|filters| { + filters + .iter() + .map(|cwd| normalize_cwd_for_state_db(cwd)) + .collect::>() + }); match ctx .list_threads( page_size, @@ -220,6 +227,7 @@ pub async fn list_threads_db( archived_only: archived, allowed_sources: allowed_sources.as_slice(), model_providers: model_providers.as_deref(), + cwd_filters: normalized_cwd_filters.as_deref(), anchor: anchor.as_ref(), sort_key: match sort_key { ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt, diff --git a/codex-rs/rollout/src/tests.rs b/codex-rs/rollout/src/tests.rs index d9619a598..5769a3d57 100644 --- a/codex-rs/rollout/src/tests.rs +++ b/codex-rs/rollout/src/tests.rs @@ -540,6 +540,7 @@ async fn test_list_conversations_latest_first() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -689,6 +690,7 @@ async fn test_pagination_cursor() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -756,6 +758,7 @@ async fn test_pagination_cursor() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -823,6 +826,7 @@ async fn test_pagination_cursor() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -877,6 +881,7 @@ async fn test_list_threads_scans_past_head_for_user_event() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -910,6 +915,7 @@ async fn test_get_thread_contents() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -1000,6 +1006,7 @@ async fn test_base_instructions_missing_in_meta_defaults_to_null() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -1043,6 +1050,7 @@ async fn test_base_instructions_present_in_meta_is_preserved() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -1101,6 +1109,7 @@ async fn test_created_at_sort_uses_file_mtime_for_updated_at() -> Result<()> { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await?; @@ -1186,6 +1195,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> { ThreadSortKey::UpdatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await?; @@ -1247,6 +1257,7 @@ async fn test_timestamp_only_cursor_skips_same_second_filesystem_ties() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -1315,6 +1326,7 @@ async fn test_timestamp_only_cursor_skips_same_second_filesystem_ties() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -1363,6 +1375,7 @@ async fn test_source_filter_excludes_non_matching_sessions() { ThreadSortKey::CreatedAt, INTERACTIVE_SESSION_SOURCES.as_slice(), Some(provider_filter.as_slice()), + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -1385,6 +1398,7 @@ async fn test_source_filter_excludes_non_matching_sessions() { ThreadSortKey::CreatedAt, NO_SOURCE_FILTER, /*model_providers*/ None, + /*cwd_filters*/ None, TEST_PROVIDER, ) .await @@ -1447,6 +1461,7 @@ async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<( ThreadSortKey::CreatedAt, NO_SOURCE_FILTER, Some(openai_filter.as_slice()), + /*cwd_filters*/ None, "openai", ) .await?; @@ -1467,6 +1482,7 @@ async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<( ThreadSortKey::CreatedAt, NO_SOURCE_FILTER, Some(beta_filter.as_slice()), + /*cwd_filters*/ None, "openai", ) .await?; @@ -1486,6 +1502,7 @@ async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<( ThreadSortKey::CreatedAt, NO_SOURCE_FILTER, Some(unknown_filter.as_slice()), + /*cwd_filters*/ None, "openai", ) .await?; @@ -1498,6 +1515,7 @@ async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<( ThreadSortKey::CreatedAt, NO_SOURCE_FILTER, /*model_providers*/ None, + /*cwd_filters*/ None, "openai", ) .await?; diff --git a/codex-rs/state/migrations/0027_threads_cwd_sort_indexes.sql b/codex-rs/state/migrations/0027_threads_cwd_sort_indexes.sql new file mode 100644 index 000000000..8b1e5c58f --- /dev/null +++ b/codex-rs/state/migrations/0027_threads_cwd_sort_indexes.sql @@ -0,0 +1,2 @@ +CREATE INDEX idx_threads_archived_cwd_created_at_ms ON threads(archived, cwd, created_at_ms DESC, id DESC); +CREATE INDEX idx_threads_archived_cwd_updated_at_ms ON threads(archived, cwd, updated_at_ms DESC, id DESC); diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index eee3a2c7b..b9cf432e4 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -175,6 +175,7 @@ LEFT JOIN jobs archived_only: false, allowed_sources, model_providers: None, + cwd_filters: None, anchor: None, sort_key: SortKey::UpdatedAt, sort_direction: SortDirection::Desc, diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 70fea73ff..8a3bcee5b 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -359,6 +359,7 @@ ON CONFLICT(child_thread_id) DO NOTHING archived_only, allowed_sources, model_providers, + cwd_filters: None, anchor: None, sort_key: crate::SortKey::UpdatedAt, sort_direction: SortDirection::Desc, @@ -437,6 +438,7 @@ ON CONFLICT(child_thread_id) DO NOTHING archived_only, allowed_sources, model_providers, + cwd_filters: None, anchor, sort_key, sort_direction: SortDirection::Desc, @@ -1002,6 +1004,7 @@ pub struct ThreadFilterOptions<'a> { pub archived_only: bool, pub allowed_sources: &'a [String], pub model_providers: Option<&'a [String]>, + pub cwd_filters: Option<&'a [PathBuf]>, pub anchor: Option<&'a crate::Anchor>, pub sort_key: SortKey, pub sort_direction: SortDirection, @@ -1016,6 +1019,7 @@ pub(super) fn push_thread_filters<'a>( archived_only, allowed_sources, model_providers, + cwd_filters, anchor, sort_key, sort_direction, @@ -1046,6 +1050,20 @@ pub(super) fn push_thread_filters<'a>( } separated.push_unseparated(")"); } + match cwd_filters { + Some([]) => { + builder.push(" AND 1 = 0"); + } + Some(cwd_filters) => { + builder.push(" AND threads.cwd IN ("); + let mut separated = builder.separated(", "); + for cwd in cwd_filters { + separated.push_bind(cwd.display().to_string()); + } + separated.push_unseparated(")"); + } + None => {} + } if let Some(search_term) = search_term { builder.push(" AND instr(threads.title, "); builder.push_bind(search_term); @@ -1188,6 +1206,7 @@ mod tests { archived_only: false, allowed_sources: &[], model_providers: Some(&model_providers), + cwd_filters: None, anchor: Some(&anchor), sort_key: SortKey::UpdatedAt, sort_direction: SortDirection::Asc, @@ -1214,6 +1233,7 @@ mod tests { archived_only: false, allowed_sources: &[], model_providers: Some(&model_providers), + cwd_filters: None, anchor: page.next_anchor.as_ref(), sort_key: SortKey::UpdatedAt, sort_direction: SortDirection::Asc, @@ -1228,6 +1248,77 @@ mod tests { assert_eq!(page.next_anchor, None); } + #[tokio::test] + async fn list_threads_filters_by_cwd() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("state db should initialize"); + let first_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000101").expect("valid thread id"); + let second_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000102").expect("valid thread id"); + let other_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000103").expect("valid thread id"); + let first_cwd = codex_home.join("first"); + let second_cwd = codex_home.join("second"); + let other_cwd = codex_home.join("other"); + + for (thread_id, cwd, updated_at) in [ + (first_id, first_cwd.clone(), 1_700_000_100), + (second_id, second_cwd.clone(), 1_700_000_300), + (other_id, other_cwd, 1_700_000_500), + ] { + let mut metadata = test_thread_metadata(&codex_home, thread_id, cwd); + metadata.updated_at = + DateTime::::from_timestamp(updated_at, 0).expect("valid timestamp"); + runtime + .upsert_thread(&metadata) + .await + .expect("thread insert should succeed"); + } + + let cwd_filters = vec![first_cwd, second_cwd]; + let page = runtime + .list_threads( + /*page_size*/ 10, + ThreadFilterOptions { + archived_only: false, + allowed_sources: &[], + model_providers: None, + cwd_filters: Some(cwd_filters.as_slice()), + anchor: None, + sort_key: SortKey::UpdatedAt, + sort_direction: SortDirection::Desc, + search_term: None, + }, + ) + .await + .expect("list should succeed"); + + let ids = page.items.iter().map(|item| item.id).collect::>(); + assert_eq!(ids, vec![second_id, first_id]); + + let page = runtime + .list_threads( + /*page_size*/ 10, + ThreadFilterOptions { + archived_only: false, + allowed_sources: &[], + model_providers: None, + cwd_filters: Some(&[]), + anchor: None, + sort_key: SortKey::UpdatedAt, + sort_direction: SortDirection::Desc, + search_term: None, + }, + ) + .await + .expect("list with empty cwd filters should succeed"); + + assert_eq!(page.items, Vec::new()); + } + #[tokio::test] async fn apply_rollout_items_restores_memory_mode_from_session_meta() { let codex_home = unique_temp_dir(); diff --git a/codex-rs/thread-store/src/local/archive_thread.rs b/codex-rs/thread-store/src/local/archive_thread.rs index dc07e4962..0dd65df72 100644 --- a/codex-rs/thread-store/src/local/archive_thread.rs +++ b/codex-rs/thread-store/src/local/archive_thread.rs @@ -103,8 +103,10 @@ mod tests { sort_direction: crate::SortDirection::Desc, allowed_sources: Vec::new(), model_providers: None, + cwd_filters: None, archived: true, search_term: None, + use_state_db_only: false, }) .await .expect("archived listing"); diff --git a/codex-rs/thread-store/src/local/list_threads.rs b/codex-rs/thread-store/src/local/list_threads.rs index a0c2d4131..d68ebd47f 100644 --- a/codex-rs/thread-store/src/local/list_threads.rs +++ b/codex-rs/thread-store/src/local/list_threads.rs @@ -68,7 +68,35 @@ async fn list_rollout_threads( sort_key: codex_rollout::ThreadSortKey, sort_direction: codex_rollout::SortDirection, ) -> ThreadStoreResult { - let page = if params.archived { + let page = if params.use_state_db_only && params.archived { + RolloutRecorder::list_archived_threads_from_state_db( + config, + params.page_size, + cursor, + sort_key, + sort_direction, + params.allowed_sources.as_slice(), + params.model_providers.as_deref(), + params.cwd_filters.as_deref(), + config.model_provider_id.as_str(), + params.search_term.as_deref(), + ) + .await + } else if params.use_state_db_only { + RolloutRecorder::list_threads_from_state_db( + config, + params.page_size, + cursor, + sort_key, + sort_direction, + params.allowed_sources.as_slice(), + params.model_providers.as_deref(), + params.cwd_filters.as_deref(), + config.model_provider_id.as_str(), + params.search_term.as_deref(), + ) + .await + } else if params.archived { RolloutRecorder::list_archived_threads( config, params.page_size, @@ -77,6 +105,7 @@ async fn list_rollout_threads( sort_direction, params.allowed_sources.as_slice(), params.model_providers.as_deref(), + params.cwd_filters.as_deref(), config.model_provider_id.as_str(), params.search_term.as_deref(), ) @@ -90,6 +119,7 @@ async fn list_rollout_threads( sort_direction, params.allowed_sources.as_slice(), params.model_providers.as_deref(), + params.cwd_filters.as_deref(), config.model_provider_id.as_str(), params.search_term.as_deref(), ) @@ -140,8 +170,10 @@ mod tests { sort_direction: SortDirection::Desc, allowed_sources: Vec::new(), model_providers: None, + cwd_filters: None, archived: false, search_term: None, + use_state_db_only: false, }) .await .expect("thread listing"); @@ -196,8 +228,10 @@ mod tests { sort_direction: SortDirection::Desc, allowed_sources: Vec::new(), model_providers: None, + cwd_filters: None, archived: false, search_term: Some("needle".to_string()), + use_state_db_only: true, }) .await .expect("thread listing"); @@ -233,8 +267,10 @@ mod tests { sort_direction: SortDirection::Desc, allowed_sources: Vec::new(), model_providers: None, + cwd_filters: None, archived: false, search_term: None, + use_state_db_only: false, }) .await .expect("active listing"); @@ -246,8 +282,10 @@ mod tests { sort_direction: SortDirection::Desc, allowed_sources: Vec::new(), model_providers: None, + cwd_filters: None, archived: true, search_term: None, + use_state_db_only: false, }) .await .expect("archived listing"); @@ -295,8 +333,10 @@ mod tests { sort_direction: SortDirection::Desc, allowed_sources: vec![SessionSource::Cli], model_providers: Some(vec!["test-provider".to_string()]), + cwd_filters: None, archived: false, search_term: None, + use_state_db_only: false, }) .await .expect("thread listing"); @@ -329,8 +369,10 @@ mod tests { sort_direction: SortDirection::Desc, allowed_sources: Vec::new(), model_providers: None, + cwd_filters: None, archived: false, search_term: None, + use_state_db_only: false, }) .await .expect_err("invalid cursor should fail"); diff --git a/codex-rs/thread-store/src/remote/list_threads.rs b/codex-rs/thread-store/src/remote/list_threads.rs index 0ed1f44a1..cbccb792f 100644 --- a/codex-rs/thread-store/src/remote/list_threads.rs +++ b/codex-rs/thread-store/src/remote/list_threads.rs @@ -30,8 +30,15 @@ pub(super) async fn list_threads( model_provider_filter: params .model_providers .map(|values| proto::ModelProviderFilter { values }), + cwd_filter: params.cwd_filters.map(|values| proto::CwdFilter { + values: values + .into_iter() + .map(|cwd| cwd.display().to_string()) + .collect(), + }), archived: params.archived, search_term: params.search_term, + use_state_db_only: params.use_state_db_only, }; let response = store @@ -91,12 +98,19 @@ mod tests { ); assert_eq!(request.archived, true); assert_eq!(request.search_term.as_deref(), Some("needle")); + assert!(request.use_state_db_only); assert_eq!( request.model_provider_filter, Some(proto::ModelProviderFilter { values: vec!["openai".to_string()], }) ); + assert_eq!( + request.cwd_filter, + Some(proto::CwdFilter { + values: vec!["/workspace".to_string()], + }) + ); assert_eq!(request.allowed_sources.len(), 1); assert_eq!( proto::SessionSourceKind::try_from(request.allowed_sources[0].kind), @@ -164,8 +178,10 @@ mod tests { sort_direction: crate::SortDirection::Desc, allowed_sources: vec![SessionSource::Cli], model_providers: Some(vec!["openai".to_string()]), + cwd_filters: Some(vec![PathBuf::from("/workspace")]), archived: true, search_term: Some("needle".to_string()), + use_state_db_only: true, }) .await .expect("list threads"); diff --git a/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.proto b/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.proto index f217027fc..267cb9320 100644 --- a/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.proto +++ b/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.proto @@ -14,12 +14,18 @@ message ListThreadsRequest { optional ModelProviderFilter model_provider_filter = 5; bool archived = 6; optional string search_term = 7; + optional CwdFilter cwd_filter = 8; + bool use_state_db_only = 9; } message ModelProviderFilter { repeated string values = 1; } +message CwdFilter { + repeated string values = 1; +} + enum ThreadSortKey { THREAD_SORT_KEY_CREATED_AT = 0; THREAD_SORT_KEY_UPDATED_AT = 1; diff --git a/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.rs b/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.rs index 6d0e41026..73262bc4a 100644 --- a/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.rs +++ b/codex-rs/thread-store/src/remote/proto/codex.thread_store.v1.rs @@ -17,12 +17,21 @@ pub struct ListThreadsRequest { pub archived: bool, #[prost(string, optional, tag = "7")] pub search_term: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, optional, tag = "8")] + pub cwd_filter: ::core::option::Option, + #[prost(bool, tag = "9")] + pub use_state_db_only: bool, } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct ModelProviderFilter { #[prost(string, repeated, tag = "1")] pub values: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CwdFilter { + #[prost(string, repeated, tag = "1")] + pub values: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct ListThreadsResponse { #[prost(message, repeated, tag = "1")] diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 2145adc3d..1f12bea64 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -128,10 +128,15 @@ pub struct ListThreadsParams { /// Optional model provider filter. `None` means implementation default, while an empty vector /// means all providers. pub model_providers: Option>, + /// Optional cwd filters. `None` means all working directories, while an empty vector matches no + /// threads. + pub cwd_filters: Option>, /// Whether archived threads should be listed instead of active threads. pub archived: bool, /// Optional substring/full-text search term for thread title/preview. pub search_term: Option, + /// Return directly from the state DB without scanning JSONL rollouts to repair metadata. + pub use_state_db_only: bool, } /// A page of stored thread records. diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 7e33f2e8a..8814cddda 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -27,6 +27,7 @@ use codex_app_server_protocol::Account as AppServerAccount; use codex_app_server_protocol::AuthMode as AppServerAuthMode; use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::Thread as AppServerThread; +use codex_app_server_protocol::ThreadListCwdFilter; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadSortKey as AppServerThreadSortKey; use codex_app_server_protocol::ThreadSourceKind; @@ -523,6 +524,7 @@ async fn lookup_session_target_by_name_with_app_server( source_kinds: Some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]), archived: Some(false), cwd: None, + use_state_db_only: false, search_term: Some(name.to_string()), }) .await?; @@ -614,7 +616,8 @@ fn latest_session_lookup_params( source_kinds: (!include_non_interactive) .then_some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]), archived: Some(false), - cwd: cwd_filter.map(|cwd| cwd.to_string_lossy().to_string()), + cwd: cwd_filter.map(|cwd| ThreadListCwdFilter::One(cwd.to_string_lossy().to_string())), + use_state_db_only: false, search_term: None, } } @@ -1883,7 +1886,10 @@ mod tests { ); assert_eq!(params.model_providers, Some(vec![config.model_provider_id])); - assert_eq!(params.cwd, Some(cwd.to_string_lossy().to_string())); + assert_eq!( + params.cwd, + Some(ThreadListCwdFilter::One(cwd.to_string_lossy().to_string())) + ); Ok(()) } @@ -1918,7 +1924,10 @@ mod tests { ); assert_eq!(params.model_providers, None); - assert_eq!(params.cwd.as_deref(), Some("repo/on/server")); + assert_eq!( + params.cwd, + Some(ThreadListCwdFilter::One(String::from("repo/on/server"))) + ); Ok(()) } diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index 94d5401d2..fe0825ccd 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -14,6 +14,7 @@ use crate::tui::TuiEvent; use chrono::DateTime; use chrono::Utc; use codex_app_server_protocol::Thread; +use codex_app_server_protocol::ThreadListCwdFilter; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadSortKey; use codex_app_server_protocol::ThreadSourceKind; @@ -1002,7 +1003,8 @@ fn thread_list_params( source_kinds: (!include_non_interactive) .then_some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]), archived: Some(false), - cwd: cwd_filter.map(|cwd| cwd.to_string_lossy().to_string()), + cwd: cwd_filter.map(|cwd| ThreadListCwdFilter::One(cwd.to_string_lossy().into_owned())), + use_state_db_only: false, search_term: None, } } @@ -1573,7 +1575,10 @@ mod tests { params.source_kinds, Some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]) ); - assert_eq!(params.cwd.as_deref(), Some("repo/on/server")); + assert_eq!( + params.cwd, + Some(ThreadListCwdFilter::One(String::from("repo/on/server"))) + ); } #[test]