From ac0bff27e714da0243f656a577dc64679c807af2 Mon Sep 17 00:00:00 2001 From: Francis Chalissery Date: Thu, 21 May 2026 11:52:24 -0700 Subject: [PATCH] [codex] Add rollout-backed thread content search (#23519) ## Summary - add experimental `thread/search` for local rollout-backed thread search using `rg` over JSONL rollouts - return search-specific result rows with optional previews instead of storing preview data on `StoredThread` or ordinary `Thread` responses - keep `thread/list` separate from full-content search and document the new app-server surface ## Testing - `cargo test -p codex-app-server-protocol` - `cargo test -p codex-app-server thread_search_returns_content_and_title_matches -- --nocapture` --- codex-rs/Cargo.lock | 1 + .../schema/json/ServerNotification.json | 2 +- .../codex_app_server_protocol.schemas.json | 15 ++ .../codex_app_server_protocol.v2.schemas.json | 15 ++ .../typescript/v2/ThreadSearchResult.ts | 6 + .../schema/typescript/v2/index.ts | 1 + .../src/protocol/common.rs | 6 + .../src/protocol/v2/thread.rs | 51 ++++ codex-rs/app-server/src/message_processor.rs | 3 + codex-rs/app-server/src/request_processors.rs | 4 + .../request_processors/thread_processor.rs | 134 ++++++++++ .../app-server/tests/common/mcp_process.rs | 10 + .../app-server/tests/suite/v2/thread_list.rs | 156 +++++++++++ codex-rs/rollout/src/lib.rs | 3 + codex-rs/rollout/src/search.rs | 251 ++++++++++++++++++ codex-rs/thread-store/Cargo.toml | 1 + codex-rs/thread-store/src/lib.rs | 3 + .../thread-store/src/local/list_threads.rs | 2 +- codex-rs/thread-store/src/local/mod.rs | 10 + .../thread-store/src/local/search_threads.rs | 217 +++++++++++++++ codex-rs/thread-store/src/store.rs | 12 + codex-rs/thread-store/src/types.rs | 34 +++ 22 files changed, 935 insertions(+), 2 deletions(-) create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/ThreadSearchResult.ts create mode 100644 codex-rs/rollout/src/search.rs create mode 100644 codex-rs/thread-store/src/local/search_threads.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index c13f25c84..3fa4d8315 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -3744,6 +3744,7 @@ dependencies = [ "async-trait", "chrono", "codex-git-utils", + "codex-install-context", "codex-protocol", "codex-rollout", "codex-state", diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 90899cb15..dfb999cf3 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -6546,4 +6546,4 @@ } ], "title": "ServerNotification" -} \ No newline at end of file +} diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index dda285476..155afbb29 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -17440,6 +17440,21 @@ "title": "ThreadRollbackResponse", "type": "object" }, + "ThreadSearchResult": { + "properties": { + "snippet": { + "type": "string" + }, + "thread": { + "$ref": "#/definitions/v2/Thread" + } + }, + "required": [ + "snippet", + "thread" + ], + "type": "object" + }, "ThreadSetNameParams": { "$schema": "http://json-schema.org/draft-07/schema#", "properties": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 3a23ba3a6..a636cf1de 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -15264,6 +15264,21 @@ "title": "ThreadRollbackResponse", "type": "object" }, + "ThreadSearchResult": { + "properties": { + "snippet": { + "type": "string" + }, + "thread": { + "$ref": "#/definitions/Thread" + } + }, + "required": [ + "snippet", + "thread" + ], + "type": "object" + }, "ThreadSetNameParams": { "$schema": "http://json-schema.org/draft-07/schema#", "properties": { diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadSearchResult.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadSearchResult.ts new file mode 100644 index 000000000..bdd83b850 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadSearchResult.ts @@ -0,0 +1,6 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { Thread } from "./Thread"; + +export type ThreadSearchResult = { thread: Thread, snippet: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index e9b8993c2..d5ae15e8e 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -401,6 +401,7 @@ export type { ThreadResumeParams } from "./ThreadResumeParams"; export type { ThreadResumeResponse } from "./ThreadResumeResponse"; export type { ThreadRollbackParams } from "./ThreadRollbackParams"; export type { ThreadRollbackResponse } from "./ThreadRollbackResponse"; +export type { ThreadSearchResult } from "./ThreadSearchResult"; export type { ThreadSetNameParams } from "./ThreadSetNameParams"; export type { ThreadSetNameResponse } from "./ThreadSetNameResponse"; export type { ThreadSettings } from "./ThreadSettings"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 76334e8a6..dd1971116 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -569,6 +569,12 @@ client_request_definitions! { serialization: None, response: v2::ThreadListResponse, }, + #[experimental("thread/search")] + ThreadSearch => "thread/search" { + params: v2::ThreadSearchParams, + serialization: None, + response: v2::ThreadSearchResponse, + }, ThreadLoadedList => "thread/loaded/list" { params: v2::ThreadLoadedListParams, serialization: None, diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index 35dbca288..01d8bc2a6 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -973,6 +973,34 @@ pub struct ThreadListParams { pub search_term: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadSearchParams { + /// Opaque pagination cursor returned by a previous call. + #[ts(optional = nullable)] + pub cursor: Option, + /// Optional page size; defaults to a reasonable server-side value. + #[ts(optional = nullable)] + pub limit: Option, + /// Optional sort key; defaults to created_at. + #[ts(optional = nullable)] + pub sort_key: Option, + /// Optional sort direction; defaults to descending (newest first). + #[ts(optional = nullable)] + pub sort_direction: Option, + /// Optional source filter; when set, only sessions from these source kinds + /// are returned. When omitted or empty, defaults to interactive sources. + #[ts(optional = nullable)] + pub source_kinds: Option>, + /// Optional archived filter; when set to true, only archived threads are returned. + /// If false or null, only non-archived threads are returned. + #[ts(optional = nullable)] + pub archived: Option, + /// Required substring/full-text query for thread search. + pub search_term: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)] #[serde(untagged)] pub enum ThreadListCwdFilter { @@ -1029,6 +1057,29 @@ pub struct ThreadListResponse { pub backwards_cursor: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadSearchResult { + pub thread: Thread, + pub snippet: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadSearchResponse { + pub data: Vec, + /// Opaque cursor to pass to the next call to continue after the last item. + /// if None, there are no more items to return. + pub next_cursor: Option, + /// Opaque cursor to pass as `cursor` when reversing `sortDirection`. + /// This is only populated when the page contains at least one thread. + /// Use it with the opposite `sortDirection`; for timestamp sorts it anchors + /// at the start of the page timestamp so same-second updates are not skipped. + pub backwards_cursor: Option, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 9c10a3340..0e1f019de 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -1072,6 +1072,9 @@ impl MessageProcessor { ClientRequest::ThreadList { params, .. } => { self.thread_processor.thread_list(params).await } + ClientRequest::ThreadSearch { params, .. } => { + self.thread_processor.thread_search(params).await + } ClientRequest::ThreadLoadedList { params, .. } => { self.thread_processor.thread_loaded_list(params).await } diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 311ab29d5..d0958e80a 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -214,6 +214,9 @@ use codex_app_server_protocol::ThreadRealtimeStopResponse; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadRollbackParams; +use codex_app_server_protocol::ThreadSearchParams; +use codex_app_server_protocol::ThreadSearchResponse; +use codex_app_server_protocol::ThreadSearchResult; use codex_app_server_protocol::ThreadSetNameParams; use codex_app_server_protocol::ThreadSetNameResponse; use codex_app_server_protocol::ThreadSettings; @@ -409,6 +412,7 @@ use codex_thread_store::ListThreadsParams as StoreListThreadsParams; use codex_thread_store::LocalThreadStore; use codex_thread_store::ReadThreadByRolloutPathParams as StoreReadThreadByRolloutPathParams; use codex_thread_store::ReadThreadParams as StoreReadThreadParams; +use codex_thread_store::SearchThreadsParams as StoreSearchThreadsParams; use codex_thread_store::SortDirection as StoreSortDirection; use codex_thread_store::StoredThread; use codex_thread_store::ThreadMetadataPatch as StoreThreadMetadataPatch; diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 6b6e1729a..1766c971b 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -589,6 +589,15 @@ impl ThreadRequestProcessor { .map(|response| Some(response.into())) } + pub(crate) async fn thread_search( + &self, + params: ThreadSearchParams, + ) -> Result, JSONRPCErrorError> { + self.thread_search_response_inner(params) + .await + .map(|response| Some(response.into())) + } + pub(crate) async fn thread_loaded_list( &self, params: ThreadLoadedListParams, @@ -1861,6 +1870,131 @@ impl ThreadRequestProcessor { }) } + async fn thread_search_response_inner( + &self, + params: ThreadSearchParams, + ) -> Result { + let ThreadSearchParams { + cursor, + limit, + sort_key, + sort_direction, + source_kinds, + archived, + search_term, + } = params; + let search_term = search_term.trim().to_string(); + let search_term = (!search_term.is_empty()) + .then_some(search_term) + .ok_or_else(|| invalid_request("thread/search requires a non-empty searchTerm"))?; + let requested_page_size = limit + .map(|value| value as usize) + .unwrap_or(THREAD_LIST_DEFAULT_LIMIT) + .clamp(1, THREAD_LIST_MAX_LIMIT); + let store_sort_key = match sort_key.unwrap_or(ThreadSortKey::CreatedAt) { + ThreadSortKey::CreatedAt => StoreThreadSortKey::CreatedAt, + ThreadSortKey::UpdatedAt => StoreThreadSortKey::UpdatedAt, + }; + let store_sort_direction = sort_direction.unwrap_or(SortDirection::Desc); + let (allowed_sources, source_kind_filter) = compute_source_filters(source_kinds); + let mut cursor_obj = cursor; + let mut last_cursor = cursor_obj.clone(); + let mut remaining = requested_page_size; + let mut search_results = Vec::with_capacity(requested_page_size); + let mut next_cursor = None; + + while remaining > 0 { + let page = self + .thread_store + .search_threads(StoreSearchThreadsParams { + page_size: remaining.min(THREAD_LIST_MAX_LIMIT), + cursor: cursor_obj.clone(), + sort_key: store_sort_key, + sort_direction: match store_sort_direction { + SortDirection::Asc => StoreSortDirection::Asc, + SortDirection::Desc => StoreSortDirection::Desc, + }, + allowed_sources: allowed_sources.clone(), + archived: archived.unwrap_or(false), + search_term: search_term.clone(), + }) + .await + .map_err(thread_store_list_error)?; + + for result in page.items { + let source = with_thread_spawn_agent_metadata( + result.thread.source.clone(), + result.thread.agent_nickname.clone(), + result.thread.agent_role.clone(), + ); + if source_kind_filter + .as_ref() + .is_none_or(|filter| source_kind_matches(&source, filter)) + { + search_results.push(result); + if search_results.len() >= requested_page_size { + break; + } + } + } + + remaining = requested_page_size.saturating_sub(search_results.len()); + next_cursor = page.next_cursor; + if remaining == 0 { + break; + } + + let Some(cursor_val) = next_cursor.clone() else { + break; + }; + if last_cursor.as_ref() == Some(&cursor_val) { + next_cursor = None; + break; + } + last_cursor = Some(cursor_val.clone()); + cursor_obj = Some(cursor_val); + } + + let backwards_cursor = search_results.first().and_then(|result| { + thread_backwards_cursor_for_sort_key( + &result.thread, + store_sort_key, + store_sort_direction, + ) + }); + let fallback_provider = self.config.model_provider_id.clone(); + let mut results = Vec::with_capacity(search_results.len()); + let mut status_ids = Vec::with_capacity(search_results.len()); + for result in search_results { + let (thread, _) = thread_from_stored_thread( + result.thread, + fallback_provider.as_str(), + &self.config.cwd, + ); + status_ids.push(thread.id.clone()); + results.push((thread, result.snippet)); + } + let statuses = self + .thread_watch_manager + .loaded_statuses_for_threads(status_ids) + .await; + let data = results + .into_iter() + .map(|(mut thread, snippet)| { + if let Some(status) = statuses.get(&thread.id) { + thread.status = status.clone(); + } + ThreadSearchResult { thread, snippet } + }) + .collect(); + + Ok(ThreadSearchResponse { + data, + next_cursor, + backwards_cursor, + }) + } + async fn thread_loaded_list_response_inner( &self, params: ThreadLoadedListParams, diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index f9c24c226..993ffa35f 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -88,6 +88,7 @@ use codex_app_server_protocol::ThreadRealtimeStartParams; use codex_app_server_protocol::ThreadRealtimeStopParams; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadRollbackParams; +use codex_app_server_protocol::ThreadSearchParams; use codex_app_server_protocol::ThreadSetNameParams; use codex_app_server_protocol::ThreadSettingsUpdateParams; use codex_app_server_protocol::ThreadShellCommandParams; @@ -508,6 +509,15 @@ impl McpProcess { self.send_request("thread/list", params).await } + /// Send a `thread/search` JSON-RPC request. + pub async fn send_thread_search_request( + &mut self, + params: ThreadSearchParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/search", params).await + } + /// Send a `thread/loaded/list` JSON-RPC request. pub async fn send_thread_loaded_list_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 80254d8f4..bfb1d4f5e 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -17,6 +17,7 @@ use codex_app_server_protocol::SessionSource; use codex_app_server_protocol::SortDirection; use codex_app_server_protocol::ThreadListCwdFilter; use codex_app_server_protocol::ThreadListResponse; +use codex_app_server_protocol::ThreadSearchResponse; use codex_app_server_protocol::ThreadSortKey; use codex_app_server_protocol::ThreadSourceKind; use codex_app_server_protocol::ThreadStartParams; @@ -660,6 +661,161 @@ sqlite = true Ok(()) } +#[tokio::test] +async fn thread_search_returns_content_matches() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + let older_match = create_fake_rollout( + codex_home.path(), + "2025-01-02T10-00-00", + "2025-01-02T10:00:00Z", + "match: needle", + Some("mock_provider"), + /*git_info*/ None, + )?; + let _non_match = create_fake_rollout( + codex_home.path(), + "2025-01-02T11-00-00", + "2025-01-02T11:00:00Z", + "no hit here", + Some("mock_provider"), + /*git_info*/ None, + )?; + let newer_match = create_fake_rollout( + codex_home.path(), + "2025-01-02T12-00-00", + "2025-01-02T12:00:00Z", + "needle suffix", + Some("mock_provider"), + /*git_info*/ None, + )?; + + let mut mcp = init_mcp(codex_home.path()).await?; + let request_id = mcp + .send_thread_search_request(codex_app_server_protocol::ThreadSearchParams { + cursor: None, + limit: Some(10), + sort_key: None, + sort_direction: None, + source_kinds: None, + archived: None, + search_term: "needle".to_string(), + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let ThreadSearchResponse { + data, next_cursor, .. + } = to_response::(resp)?; + + assert_eq!(next_cursor, None); + let ids: Vec<_> = data + .iter() + .map(|result| result.thread.id.as_str()) + .collect(); + assert_eq!(ids, vec![newer_match, older_match]); + assert_eq!(data[0].snippet, "needle suffix"); + + Ok(()) +} + +#[tokio::test] +async fn thread_search_matches_json_escaped_content() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + let search_term = r#"quoted "needle" \ path"#; + let thread_id = create_fake_rollout( + codex_home.path(), + "2025-01-02T10-00-00", + "2025-01-02T10:00:00Z", + search_term, + Some("mock_provider"), + /*git_info*/ None, + )?; + + let mut mcp = init_mcp(codex_home.path()).await?; + let request_id = mcp + .send_thread_search_request(codex_app_server_protocol::ThreadSearchParams { + cursor: None, + limit: Some(10), + sort_key: None, + sort_direction: None, + source_kinds: None, + archived: None, + search_term: search_term.to_string(), + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let ThreadSearchResponse { data, .. } = to_response::(resp)?; + + assert_eq!(data.len(), 1); + assert_eq!(data[0].thread.id, thread_id); + assert_eq!(data[0].snippet, search_term); + + Ok(()) +} + +#[tokio::test] +async fn thread_search_filters_by_source_kind() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + let cli_id = create_fake_rollout( + codex_home.path(), + "2025-02-01T10-00-00", + "2025-02-01T10:00:00Z", + "shared needle", + Some("mock_provider"), + /*git_info*/ None, + )?; + let exec_id = create_fake_rollout_with_source( + codex_home.path(), + "2025-02-01T11-00-00", + "2025-02-01T11:00:00Z", + "shared needle", + Some("mock_provider"), + /*git_info*/ None, + CoreSessionSource::Exec, + )?; + + let mut mcp = init_mcp(codex_home.path()).await?; + let request_id = mcp + .send_thread_search_request(codex_app_server_protocol::ThreadSearchParams { + cursor: None, + limit: Some(10), + sort_key: None, + sort_direction: None, + source_kinds: Some(vec![ThreadSourceKind::Exec]), + archived: None, + search_term: "needle".to_string(), + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let ThreadSearchResponse { data, .. } = to_response::(resp)?; + + let ids: Vec<_> = data + .iter() + .map(|result| result.thread.id.as_str()) + .collect(); + assert_eq!(ids, vec![exec_id.as_str()]); + assert_ne!(cli_id, exec_id); + + Ok(()) +} + #[tokio::test] async fn thread_list_state_db_only_returns_sqlite_without_jsonl_repair() -> Result<()> { let codex_home = TempDir::new()?; diff --git a/codex-rs/rollout/src/lib.rs b/codex-rs/rollout/src/lib.rs index 897c6d8fd..abbad1da0 100644 --- a/codex-rs/rollout/src/lib.rs +++ b/codex-rs/rollout/src/lib.rs @@ -9,6 +9,7 @@ pub(crate) mod list; pub(crate) mod metadata; pub(crate) mod policy; pub(crate) mod recorder; +pub(crate) mod search; pub(crate) mod session_index; mod sqlite_metrics; pub mod state_db; @@ -60,6 +61,8 @@ pub use policy::should_persist_response_item_for_memories; pub use recorder::RolloutRecorder; pub use recorder::RolloutRecorderParams; pub use recorder::append_rollout_item_to_path; +pub use search::first_rollout_content_match_snippet; +pub use search::search_rollout_paths; pub use session_index::append_thread_name; pub use session_index::find_thread_meta_by_name_str; pub use session_index::find_thread_name_by_id; diff --git a/codex-rs/rollout/src/search.rs b/codex-rs/rollout/src/search.rs new file mode 100644 index 000000000..1773f5afb --- /dev/null +++ b/codex-rs/rollout/src/search.rs @@ -0,0 +1,251 @@ +use std::collections::HashSet; +use std::io; +use std::path::Path; +use std::path::PathBuf; + +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::RolloutLine; +use codex_protocol::protocol::USER_MESSAGE_BEGIN; +use tokio::io::AsyncBufReadExt; +use tokio::process::Command; + +use super::ARCHIVED_SESSIONS_SUBDIR; +use super::SESSIONS_SUBDIR; + +const MATCH_CONTEXT_BEFORE_CHARS: usize = 48; +const MATCH_CONTEXT_AFTER_CHARS: usize = 96; + +pub async fn search_rollout_paths( + rg_command: &Path, + codex_home: &Path, + archived: bool, + search_term: &str, +) -> io::Result> { + let root = codex_home.join(if archived { + ARCHIVED_SESSIONS_SUBDIR + } else { + SESSIONS_SUBDIR + }); + let search_term = json_escaped_search_term(search_term)?; + ripgrep_rollout_paths(rg_command, root.as_path(), search_term.as_str()).await +} + +async fn ripgrep_rollout_paths( + rg_command: &Path, + root: &Path, + search_term: &str, +) -> io::Result> { + if !tokio::fs::try_exists(root).await.unwrap_or(false) { + return Ok(HashSet::new()); + } + + let output = match Command::new(rg_command) + .arg("-l") + .arg("--fixed-strings") + .arg("--no-ignore") + .arg("--glob") + .arg("*.jsonl") + .arg("--") + .arg(search_term) + .arg(root) + .output() + .await + { + Ok(output) => output, + Err(err) if err.kind() == io::ErrorKind::NotFound => { + return scan_rollout_paths(root, search_term).await; + } + Err(err) => return Err(err), + }; + if !output.status.success() { + if output.status.code() == Some(1) && output.stderr.is_empty() { + return Ok(HashSet::new()); + } + + return Err(io::Error::other(format!( + "ripgrep rollout search failed under {}", + root.display() + ))); + } + + let mut matches = HashSet::new(); + for line in String::from_utf8_lossy(output.stdout.as_slice()).lines() { + let path = PathBuf::from(line); + let path = if path.is_absolute() { + path + } else { + root.join(path) + }; + matches.insert(path); + } + + Ok(matches) +} + +async fn scan_rollout_paths(root: &Path, search_term: &str) -> io::Result> { + let mut matches = HashSet::new(); + let mut dirs = vec![root.to_path_buf()]; + + while let Some(dir) = dirs.pop() { + let mut entries = match tokio::fs::read_dir(dir).await { + Ok(entries) => entries, + Err(err) if err.kind() == io::ErrorKind::NotFound => continue, + Err(err) => return Err(err), + }; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + let file_type = entry.file_type().await?; + if file_type.is_dir() { + dirs.push(path); + continue; + } + if !file_type.is_file() + || path.extension().and_then(|extension| extension.to_str()) != Some("jsonl") + { + continue; + } + if rollout_contains(path.as_path(), search_term).await? { + matches.insert(path); + } + } + } + + Ok(matches) +} + +async fn rollout_contains(path: &Path, search_term: &str) -> io::Result { + let file = tokio::fs::File::open(path).await?; + let mut lines = tokio::io::BufReader::new(file).lines(); + while let Some(line) = lines.next_line().await? { + if line.contains(search_term) { + return Ok(true); + } + } + Ok(false) +} + +pub async fn first_rollout_content_match_snippet( + path: &Path, + search_term: &str, +) -> io::Result> { + let file = tokio::fs::File::open(path).await?; + let mut lines = tokio::io::BufReader::new(file).lines(); + let json_search_term = json_escaped_search_term(search_term)?; + while let Some(line) = lines.next_line().await? { + if line.contains(json_search_term.as_str()) + && let Some(snippet) = content_match_snippet(line.as_str(), search_term) + { + return Ok(Some(snippet)); + } + } + Ok(None) +} + +fn json_escaped_search_term(search_term: &str) -> io::Result { + let serialized = serde_json::to_string(search_term).map_err(io::Error::other)?; + Ok(serialized[1..serialized.len() - 1].to_string()) +} + +fn content_match_snippet(jsonl_line: &str, search_term: &str) -> Option { + let rollout_line = serde_json::from_str::(jsonl_line.trim()).ok()?; + let text = conversation_text_from_item(&rollout_line.item)?; + excerpt_around_match(text.as_str(), search_term) +} + +fn conversation_text_from_item(item: &RolloutItem) -> Option { + match item { + RolloutItem::EventMsg(EventMsg::UserMessage(user)) => { + let text = strip_user_message_prefix(user.message.as_str()); + if text.is_empty() { + None + } else { + Some(text.to_string()) + } + } + RolloutItem::EventMsg(EventMsg::AgentMessage(agent)) => { + if agent.message.trim().is_empty() { + None + } else { + Some(agent.message.trim().to_string()) + } + } + RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) => { + let text = content + .iter() + .filter_map(content_item_text) + .collect::>() + .join(" "); + if text.trim().is_empty() || (role != "user" && role != "assistant") { + None + } else { + Some(text) + } + } + RolloutItem::SessionMeta(_) + | RolloutItem::TurnContext(_) + | RolloutItem::EventMsg(_) + | RolloutItem::ResponseItem(_) + | RolloutItem::Compacted(_) => None, + } +} + +fn content_item_text(item: &ContentItem) -> Option<&str> { + match item { + ContentItem::InputText { text } | ContentItem::OutputText { text } => Some(text.as_str()), + ContentItem::InputImage { .. } => None, + } +} + +fn strip_user_message_prefix(text: &str) -> &str { + match text.find(USER_MESSAGE_BEGIN) { + Some(idx) => text[idx + USER_MESSAGE_BEGIN.len()..].trim(), + None => text.trim(), + } +} + +fn excerpt_around_match(text: &str, search_term: &str) -> Option { + let normalized = normalize_preview_text(text); + let match_start = normalized.find(search_term)?; + let match_end = match_start.saturating_add(search_term.len()); + let excerpt_start = + char_start_before(normalized.as_str(), match_start, MATCH_CONTEXT_BEFORE_CHARS); + let excerpt_end = char_end_after(normalized.as_str(), match_end, MATCH_CONTEXT_AFTER_CHARS); + let excerpt = normalized[excerpt_start..excerpt_end].trim(); + if excerpt.is_empty() { + return None; + } + + let mut snippet = String::new(); + if excerpt_start > 0 { + snippet.push_str("... "); + } + snippet.push_str(excerpt); + if excerpt_end < normalized.len() { + snippet.push_str(" ..."); + } + Some(snippet) +} + +fn normalize_preview_text(text: &str) -> String { + text.split_whitespace().collect::>().join(" ") +} + +fn char_start_before(text: &str, byte_index: usize, chars_before: usize) -> usize { + text[..byte_index] + .char_indices() + .rev() + .nth(chars_before) + .map(|(idx, _)| idx) + .unwrap_or(0) +} + +fn char_end_after(text: &str, byte_index: usize, chars_after: usize) -> usize { + text[byte_index..] + .char_indices() + .nth(chars_after) + .map(|(offset, _)| byte_index.saturating_add(offset)) + .unwrap_or(text.len()) +} diff --git a/codex-rs/thread-store/Cargo.toml b/codex-rs/thread-store/Cargo.toml index c9f7713b4..5a0608b4d 100644 --- a/codex-rs/thread-store/Cargo.toml +++ b/codex-rs/thread-store/Cargo.toml @@ -16,6 +16,7 @@ workspace = true async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } codex-git-utils = { workspace = true } +codex-install-context = { workspace = true } codex-protocol = { workspace = true } codex-rollout = { workspace = true } codex-state = { workspace = true } diff --git a/codex-rs/thread-store/src/lib.rs b/codex-rs/thread-store/src/lib.rs index 90c7e469a..aec8270a5 100644 --- a/codex-rs/thread-store/src/lib.rs +++ b/codex-rs/thread-store/src/lib.rs @@ -34,9 +34,11 @@ pub use types::LoadThreadHistoryParams; pub use types::ReadThreadByRolloutPathParams; pub use types::ReadThreadParams; pub use types::ResumeThreadParams; +pub use types::SearchThreadsParams; pub use types::SortDirection; pub use types::StoredThread; pub use types::StoredThreadHistory; +pub use types::StoredThreadSearchResult; pub use types::StoredTurn; pub use types::StoredTurnError; pub use types::StoredTurnItemsView; @@ -45,6 +47,7 @@ pub use types::ThreadEventPersistenceMode; pub use types::ThreadMetadataPatch; pub use types::ThreadPage; pub use types::ThreadPersistenceMetadata; +pub use types::ThreadSearchPage; pub use types::ThreadSortKey; pub use types::TurnPage; pub use types::UpdateThreadMetadataParams; diff --git a/codex-rs/thread-store/src/local/list_threads.rs b/codex-rs/thread-store/src/local/list_threads.rs index ede9e9e9c..0a0767da0 100644 --- a/codex-rs/thread-store/src/local/list_threads.rs +++ b/codex-rs/thread-store/src/local/list_threads.rs @@ -107,7 +107,7 @@ pub(super) async fn list_threads( Ok(ThreadPage { items, next_cursor }) } -async fn list_rollout_threads( +pub(super) async fn list_rollout_threads( state_db: Option, config: &RolloutConfig, default_model_provider_id: &str, diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index f6a48a400..a6245796a 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -4,6 +4,7 @@ mod helpers; mod list_threads; mod live_writer; mod read_thread; +mod search_threads; mod unarchive_thread; mod update_thread_metadata; @@ -28,9 +29,11 @@ use crate::LoadThreadHistoryParams; use crate::ReadThreadByRolloutPathParams; use crate::ReadThreadParams; use crate::ResumeThreadParams; +use crate::SearchThreadsParams; use crate::StoredThread; use crate::StoredThreadHistory; use crate::ThreadPage; +use crate::ThreadSearchPage; use crate::ThreadStore; use crate::ThreadStoreError; use crate::ThreadStoreResult; @@ -261,6 +264,13 @@ impl ThreadStore for LocalThreadStore { list_threads::list_threads(self, params).await } + async fn search_threads( + &self, + params: SearchThreadsParams, + ) -> ThreadStoreResult { + search_threads::search_threads(self, params).await + } + async fn update_thread_metadata( &self, params: UpdateThreadMetadataParams, diff --git a/codex-rs/thread-store/src/local/search_threads.rs b/codex-rs/thread-store/src/local/search_threads.rs new file mode 100644 index 000000000..ed9ad3a0a --- /dev/null +++ b/codex-rs/thread-store/src/local/search_threads.rs @@ -0,0 +1,217 @@ +use std::collections::HashMap; +use std::collections::HashSet; + +use codex_install_context::InstallContext; +use codex_protocol::ThreadId; +use codex_rollout::RolloutConfig; +use codex_rollout::find_thread_names_by_ids; +use codex_rollout::first_rollout_content_match_snippet; +use codex_rollout::parse_cursor; +use codex_rollout::search_rollout_paths; + +use super::LocalThreadStore; +use super::helpers::distinct_thread_metadata_title; +use super::helpers::set_thread_name_from_title; +use super::helpers::stored_thread_from_rollout_item; +use super::list_threads::list_rollout_threads; +use crate::ListThreadsParams; +use crate::SearchThreadsParams; +use crate::SortDirection; +use crate::StoredThreadSearchResult; +use crate::ThreadSearchPage; +use crate::ThreadSortKey; +use crate::ThreadStoreError; +use crate::ThreadStoreResult; + +struct ThreadSearchItem { + item: codex_rollout::ThreadItem, + snippet: String, +} + +pub(super) async fn search_threads( + store: &LocalThreadStore, + params: SearchThreadsParams, +) -> ThreadStoreResult { + let search_term = params.search_term.as_str(); + if search_term.is_empty() { + return Err(ThreadStoreError::InvalidRequest { + message: "thread/search requires search_term".to_string(), + }); + } + let cursor = params + .cursor + .as_deref() + .map(|cursor| { + parse_cursor(cursor).ok_or_else(|| ThreadStoreError::InvalidRequest { + message: format!("invalid cursor: {cursor}"), + }) + }) + .transpose()?; + let sort_key = match params.sort_key { + ThreadSortKey::CreatedAt => codex_rollout::ThreadSortKey::CreatedAt, + ThreadSortKey::UpdatedAt => codex_rollout::ThreadSortKey::UpdatedAt, + }; + let sort_direction = match params.sort_direction { + SortDirection::Asc => codex_rollout::SortDirection::Asc, + SortDirection::Desc => codex_rollout::SortDirection::Desc, + }; + let state_db = store.state_db().await; + let rollout_config = RolloutConfig { + codex_home: store.config.codex_home.clone(), + sqlite_home: store.config.sqlite_home.clone(), + cwd: store.config.codex_home.clone(), + model_provider_id: store.config.default_model_provider_id.clone(), + generate_memories: false, + }; + let rg_command = InstallContext::current().rg_command(); + let matching_paths = search_rollout_paths( + rg_command.as_path(), + store.config.codex_home.as_path(), + params.archived, + search_term, + ) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!("failed to search rollout contents: {err}"), + })?; + if matching_paths.is_empty() { + return Ok(ThreadSearchPage { + items: Vec::new(), + next_cursor: None, + }); + } + let mut matching_items = Vec::new(); + let mut page_cursor = cursor; + let scan_page_size = params.page_size.saturating_mul(8).clamp(256, 2048); + let scan_params = ListThreadsParams { + page_size: scan_page_size, + cursor: None, + sort_key: params.sort_key, + sort_direction: params.sort_direction, + allowed_sources: params.allowed_sources.clone(), + model_providers: None, + cwd_filters: None, + archived: params.archived, + search_term: None, + use_state_db_only: state_db.is_some(), + }; + let mut remaining_paths = matching_paths; + + loop { + let page = list_rollout_threads( + state_db.clone(), + &rollout_config, + store.config.default_model_provider_id.as_str(), + &scan_params, + page_cursor.as_ref(), + sort_key, + sort_direction, + ) + .await?; + for item in page.items { + if !remaining_paths.remove(item.path.as_path()) { + continue; + } + let Some(snippet) = + first_rollout_content_match_snippet(item.path.as_path(), search_term) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!("failed to read rollout search match: {err}"), + })? + else { + continue; + }; + matching_items.push(ThreadSearchItem { item, snippet }); + if matching_items.len() > params.page_size { + break; + } + } + page_cursor = page.next_cursor; + if matching_items.len() > params.page_size + || remaining_paths.is_empty() + || page_cursor.is_none() + { + break; + } + } + + let more_matches_available = matching_items.len() > params.page_size; + matching_items.truncate(params.page_size); + let next_cursor = if more_matches_available { + matching_items + .last() + .and_then(|item| cursor_from_thread_search_item(item, params.sort_key)) + } else { + None + } + .as_ref() + .and_then(|cursor| serde_json::to_value(cursor).ok()) + .and_then(|value| value.as_str().map(str::to_owned)); + + let mut items = matching_items + .into_iter() + .filter_map(|item| { + stored_thread_from_rollout_item( + item.item, + params.archived, + store.config.default_model_provider_id.as_str(), + ) + .map(|thread| StoredThreadSearchResult { + thread, + snippet: item.snippet, + }) + }) + .collect::>(); + set_thread_search_result_names(store, &mut items).await; + + Ok(ThreadSearchPage { items, next_cursor }) +} + +fn cursor_from_thread_search_item( + item: &ThreadSearchItem, + sort_key: ThreadSortKey, +) -> Option { + let timestamp = match sort_key { + ThreadSortKey::CreatedAt => item.item.created_at.as_deref()?, + ThreadSortKey::UpdatedAt => item + .item + .updated_at + .as_deref() + .or(item.item.created_at.as_deref())?, + }; + parse_cursor(timestamp) +} + +async fn set_thread_search_result_names( + store: &LocalThreadStore, + items: &mut [StoredThreadSearchResult], +) { + let thread_ids = items + .iter() + .map(|item| item.thread.thread_id) + .collect::>(); + let mut names = HashMap::::with_capacity(thread_ids.len()); + if let Some(state_db_ctx) = store.state_db().await { + for &thread_id in &thread_ids { + let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else { + continue; + }; + if let Some(title) = distinct_thread_metadata_title(&metadata) { + names.insert(thread_id, title); + } + } + } + if names.len() < thread_ids.len() + && let Ok(legacy_names) = + find_thread_names_by_ids(store.config.codex_home.as_path(), &thread_ids).await + { + for (thread_id, title) in legacy_names { + names.entry(thread_id).or_insert(title); + } + } + for item in items { + if let Some(title) = names.get(&item.thread.thread_id).cloned() { + set_thread_name_from_title(&mut item.thread, title); + } + } +} diff --git a/codex-rs/thread-store/src/store.rs b/codex-rs/thread-store/src/store.rs index 7e5119599..7276a980d 100644 --- a/codex-rs/thread-store/src/store.rs +++ b/codex-rs/thread-store/src/store.rs @@ -13,9 +13,11 @@ use crate::LoadThreadHistoryParams; use crate::ReadThreadByRolloutPathParams; use crate::ReadThreadParams; use crate::ResumeThreadParams; +use crate::SearchThreadsParams; use crate::StoredThread; use crate::StoredThreadHistory; use crate::ThreadPage; +use crate::ThreadSearchPage; use crate::ThreadStoreError; use crate::ThreadStoreResult; use crate::TurnPage; @@ -76,6 +78,16 @@ pub trait ThreadStore: Any + Send + Sync { /// Lists stored threads matching the supplied filters. async fn list_threads(&self, params: ListThreadsParams) -> ThreadStoreResult; + /// Searches stored threads and returns search-only preview metadata. + async fn search_threads( + &self, + _params: SearchThreadsParams, + ) -> ThreadStoreResult { + Err(ThreadStoreError::Unsupported { + operation: "thread/search", + }) + } + /// Lists turns within a stored thread. async fn list_turns(&self, _params: ListTurnsParams) -> ThreadStoreResult { Err(ThreadStoreError::Unsupported { diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 508f6321a..ca75ca1b6 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -199,6 +199,25 @@ pub struct ListThreadsParams { pub use_state_db_only: bool, } +/// Parameters for searching thread content. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct SearchThreadsParams { + /// Maximum number of threads to return. + pub page_size: usize, + /// Opaque cursor returned by a previous search call. + pub cursor: Option, + /// Sort order requested by the caller. + pub sort_key: ThreadSortKey, + /// Sort direction requested by the caller. + pub sort_direction: SortDirection, + /// Allowed session sources. Empty means implementation default. + pub allowed_sources: Vec, + /// Whether archived threads should be searched instead of active threads. + pub archived: bool, + /// Visible thread content to search for. + pub search_term: String, +} + /// A page of stored thread records. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ThreadPage { @@ -208,6 +227,21 @@ pub struct ThreadPage { pub next_cursor: Option, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct StoredThreadSearchResult { + pub thread: StoredThread, + pub snippet: String, +} + +/// A page of thread-search results. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ThreadSearchPage { + /// Search results returned for this page. + pub items: Vec, + /// Opaque cursor to continue searching. + pub next_cursor: Option, +} + /// Requested amount of item detail for stored turns. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub enum StoredTurnItemsView {