[codex] Add rollout-backed thread content search (#23519)

## Summary
- add experimental `thread/search` for local rollout-backed thread
search using `rg` over JSONL rollouts
- return search-specific result rows with optional previews instead of
storing preview data on `StoredThread` or ordinary `Thread` responses
- keep `thread/list` separate from full-content search and document the
new app-server surface

## Testing
- `cargo test -p codex-app-server-protocol`
- `cargo test -p codex-app-server
thread_search_returns_content_and_title_matches -- --nocapture`
This commit is contained in:
Francis Chalissery
2026-05-21 11:52:24 -07:00
committed by GitHub
Unverified
parent 4acb456bfe
commit ac0bff27e7
22 changed files with 935 additions and 2 deletions
+1
View File
@@ -3744,6 +3744,7 @@ dependencies = [
"async-trait",
"chrono",
"codex-git-utils",
"codex-install-context",
"codex-protocol",
"codex-rollout",
"codex-state",
@@ -6546,4 +6546,4 @@
}
],
"title": "ServerNotification"
}
}
@@ -17440,6 +17440,21 @@
"title": "ThreadRollbackResponse",
"type": "object"
},
"ThreadSearchResult": {
"properties": {
"snippet": {
"type": "string"
},
"thread": {
"$ref": "#/definitions/v2/Thread"
}
},
"required": [
"snippet",
"thread"
],
"type": "object"
},
"ThreadSetNameParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
@@ -15264,6 +15264,21 @@
"title": "ThreadRollbackResponse",
"type": "object"
},
"ThreadSearchResult": {
"properties": {
"snippet": {
"type": "string"
},
"thread": {
"$ref": "#/definitions/Thread"
}
},
"required": [
"snippet",
"thread"
],
"type": "object"
},
"ThreadSetNameParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Thread } from "./Thread";
export type ThreadSearchResult = { thread: Thread, snippet: string, };
@@ -401,6 +401,7 @@ export type { ThreadResumeParams } from "./ThreadResumeParams";
export type { ThreadResumeResponse } from "./ThreadResumeResponse";
export type { ThreadRollbackParams } from "./ThreadRollbackParams";
export type { ThreadRollbackResponse } from "./ThreadRollbackResponse";
export type { ThreadSearchResult } from "./ThreadSearchResult";
export type { ThreadSetNameParams } from "./ThreadSetNameParams";
export type { ThreadSetNameResponse } from "./ThreadSetNameResponse";
export type { ThreadSettings } from "./ThreadSettings";
@@ -569,6 +569,12 @@ client_request_definitions! {
serialization: None,
response: v2::ThreadListResponse,
},
#[experimental("thread/search")]
ThreadSearch => "thread/search" {
params: v2::ThreadSearchParams,
serialization: None,
response: v2::ThreadSearchResponse,
},
ThreadLoadedList => "thread/loaded/list" {
params: v2::ThreadLoadedListParams,
serialization: None,
@@ -973,6 +973,34 @@ pub struct ThreadListParams {
pub search_term: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSearchParams {
/// Opaque pagination cursor returned by a previous call.
#[ts(optional = nullable)]
pub cursor: Option<String>,
/// Optional page size; defaults to a reasonable server-side value.
#[ts(optional = nullable)]
pub limit: Option<u32>,
/// Optional sort key; defaults to created_at.
#[ts(optional = nullable)]
pub sort_key: Option<ThreadSortKey>,
/// Optional sort direction; defaults to descending (newest first).
#[ts(optional = nullable)]
pub sort_direction: Option<SortDirection>,
/// Optional source filter; when set, only sessions from these source kinds
/// are returned. When omitted or empty, defaults to interactive sources.
#[ts(optional = nullable)]
pub source_kinds: Option<Vec<ThreadSourceKind>>,
/// Optional archived filter; when set to true, only archived threads are returned.
/// If false or null, only non-archived threads are returned.
#[ts(optional = nullable)]
pub archived: Option<bool>,
/// Required substring/full-text query for thread search.
pub search_term: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)]
#[serde(untagged)]
pub enum ThreadListCwdFilter {
@@ -1029,6 +1057,29 @@ pub struct ThreadListResponse {
pub backwards_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSearchResult {
pub thread: Thread,
pub snippet: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSearchResponse {
pub data: Vec<ThreadSearchResult>,
/// Opaque cursor to pass to the next call to continue after the last item.
/// if None, there are no more items to return.
pub next_cursor: Option<String>,
/// Opaque cursor to pass as `cursor` when reversing `sortDirection`.
/// This is only populated when the page contains at least one thread.
/// Use it with the opposite `sortDirection`; for timestamp sorts it anchors
/// at the start of the page timestamp so same-second updates are not skipped.
pub backwards_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -1072,6 +1072,9 @@ impl MessageProcessor {
ClientRequest::ThreadList { params, .. } => {
self.thread_processor.thread_list(params).await
}
ClientRequest::ThreadSearch { params, .. } => {
self.thread_processor.thread_search(params).await
}
ClientRequest::ThreadLoadedList { params, .. } => {
self.thread_processor.thread_loaded_list(params).await
}
@@ -214,6 +214,9 @@ use codex_app_server_protocol::ThreadRealtimeStopResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadSearchParams;
use codex_app_server_protocol::ThreadSearchResponse;
use codex_app_server_protocol::ThreadSearchResult;
use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadSetNameResponse;
use codex_app_server_protocol::ThreadSettings;
@@ -409,6 +412,7 @@ use codex_thread_store::ListThreadsParams as StoreListThreadsParams;
use codex_thread_store::LocalThreadStore;
use codex_thread_store::ReadThreadByRolloutPathParams as StoreReadThreadByRolloutPathParams;
use codex_thread_store::ReadThreadParams as StoreReadThreadParams;
use codex_thread_store::SearchThreadsParams as StoreSearchThreadsParams;
use codex_thread_store::SortDirection as StoreSortDirection;
use codex_thread_store::StoredThread;
use codex_thread_store::ThreadMetadataPatch as StoreThreadMetadataPatch;
@@ -589,6 +589,15 @@ impl ThreadRequestProcessor {
.map(|response| Some(response.into()))
}
pub(crate) async fn thread_search(
&self,
params: ThreadSearchParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.thread_search_response_inner(params)
.await
.map(|response| Some(response.into()))
}
pub(crate) async fn thread_loaded_list(
&self,
params: ThreadLoadedListParams,
@@ -1861,6 +1870,131 @@ impl ThreadRequestProcessor {
})
}
async fn thread_search_response_inner(
&self,
params: ThreadSearchParams,
) -> Result<ThreadSearchResponse, JSONRPCErrorError> {
let ThreadSearchParams {
cursor,
limit,
sort_key,
sort_direction,
source_kinds,
archived,
search_term,
} = params;
let search_term = search_term.trim().to_string();
let search_term = (!search_term.is_empty())
.then_some(search_term)
.ok_or_else(|| invalid_request("thread/search requires a non-empty searchTerm"))?;
let requested_page_size = limit
.map(|value| value as usize)
.unwrap_or(THREAD_LIST_DEFAULT_LIMIT)
.clamp(1, THREAD_LIST_MAX_LIMIT);
let store_sort_key = match sort_key.unwrap_or(ThreadSortKey::CreatedAt) {
ThreadSortKey::CreatedAt => StoreThreadSortKey::CreatedAt,
ThreadSortKey::UpdatedAt => StoreThreadSortKey::UpdatedAt,
};
let store_sort_direction = sort_direction.unwrap_or(SortDirection::Desc);
let (allowed_sources, source_kind_filter) = compute_source_filters(source_kinds);
let mut cursor_obj = cursor;
let mut last_cursor = cursor_obj.clone();
let mut remaining = requested_page_size;
let mut search_results = Vec::with_capacity(requested_page_size);
let mut next_cursor = None;
while remaining > 0 {
let page = self
.thread_store
.search_threads(StoreSearchThreadsParams {
page_size: remaining.min(THREAD_LIST_MAX_LIMIT),
cursor: cursor_obj.clone(),
sort_key: store_sort_key,
sort_direction: match store_sort_direction {
SortDirection::Asc => StoreSortDirection::Asc,
SortDirection::Desc => StoreSortDirection::Desc,
},
allowed_sources: allowed_sources.clone(),
archived: archived.unwrap_or(false),
search_term: search_term.clone(),
})
.await
.map_err(thread_store_list_error)?;
for result in page.items {
let source = with_thread_spawn_agent_metadata(
result.thread.source.clone(),
result.thread.agent_nickname.clone(),
result.thread.agent_role.clone(),
);
if source_kind_filter
.as_ref()
.is_none_or(|filter| source_kind_matches(&source, filter))
{
search_results.push(result);
if search_results.len() >= requested_page_size {
break;
}
}
}
remaining = requested_page_size.saturating_sub(search_results.len());
next_cursor = page.next_cursor;
if remaining == 0 {
break;
}
let Some(cursor_val) = next_cursor.clone() else {
break;
};
if last_cursor.as_ref() == Some(&cursor_val) {
next_cursor = None;
break;
}
last_cursor = Some(cursor_val.clone());
cursor_obj = Some(cursor_val);
}
let backwards_cursor = search_results.first().and_then(|result| {
thread_backwards_cursor_for_sort_key(
&result.thread,
store_sort_key,
store_sort_direction,
)
});
let fallback_provider = self.config.model_provider_id.clone();
let mut results = Vec::with_capacity(search_results.len());
let mut status_ids = Vec::with_capacity(search_results.len());
for result in search_results {
let (thread, _) = thread_from_stored_thread(
result.thread,
fallback_provider.as_str(),
&self.config.cwd,
);
status_ids.push(thread.id.clone());
results.push((thread, result.snippet));
}
let statuses = self
.thread_watch_manager
.loaded_statuses_for_threads(status_ids)
.await;
let data = results
.into_iter()
.map(|(mut thread, snippet)| {
if let Some(status) = statuses.get(&thread.id) {
thread.status = status.clone();
}
ThreadSearchResult { thread, snippet }
})
.collect();
Ok(ThreadSearchResponse {
data,
next_cursor,
backwards_cursor,
})
}
async fn thread_loaded_list_response_inner(
&self,
params: ThreadLoadedListParams,
@@ -88,6 +88,7 @@ use codex_app_server_protocol::ThreadRealtimeStartParams;
use codex_app_server_protocol::ThreadRealtimeStopParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadSearchParams;
use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadSettingsUpdateParams;
use codex_app_server_protocol::ThreadShellCommandParams;
@@ -508,6 +509,15 @@ impl McpProcess {
self.send_request("thread/list", params).await
}
/// Send a `thread/search` JSON-RPC request.
pub async fn send_thread_search_request(
&mut self,
params: ThreadSearchParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/search", params).await
}
/// Send a `thread/loaded/list` JSON-RPC request.
pub async fn send_thread_loaded_list_request(
&mut self,
@@ -17,6 +17,7 @@ use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::SortDirection;
use codex_app_server_protocol::ThreadListCwdFilter;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadSearchResponse;
use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
use codex_app_server_protocol::ThreadStartParams;
@@ -660,6 +661,161 @@ sqlite = true
Ok(())
}
#[tokio::test]
async fn thread_search_returns_content_matches() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
let older_match = create_fake_rollout(
codex_home.path(),
"2025-01-02T10-00-00",
"2025-01-02T10:00:00Z",
"match: needle",
Some("mock_provider"),
/*git_info*/ None,
)?;
let _non_match = create_fake_rollout(
codex_home.path(),
"2025-01-02T11-00-00",
"2025-01-02T11:00:00Z",
"no hit here",
Some("mock_provider"),
/*git_info*/ None,
)?;
let newer_match = create_fake_rollout(
codex_home.path(),
"2025-01-02T12-00-00",
"2025-01-02T12:00:00Z",
"needle suffix",
Some("mock_provider"),
/*git_info*/ None,
)?;
let mut mcp = init_mcp(codex_home.path()).await?;
let request_id = mcp
.send_thread_search_request(codex_app_server_protocol::ThreadSearchParams {
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
source_kinds: None,
archived: None,
search_term: "needle".to_string(),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let ThreadSearchResponse {
data, next_cursor, ..
} = to_response::<ThreadSearchResponse>(resp)?;
assert_eq!(next_cursor, None);
let ids: Vec<_> = data
.iter()
.map(|result| result.thread.id.as_str())
.collect();
assert_eq!(ids, vec![newer_match, older_match]);
assert_eq!(data[0].snippet, "needle suffix");
Ok(())
}
#[tokio::test]
async fn thread_search_matches_json_escaped_content() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
let search_term = r#"quoted "needle" \ path"#;
let thread_id = create_fake_rollout(
codex_home.path(),
"2025-01-02T10-00-00",
"2025-01-02T10:00:00Z",
search_term,
Some("mock_provider"),
/*git_info*/ None,
)?;
let mut mcp = init_mcp(codex_home.path()).await?;
let request_id = mcp
.send_thread_search_request(codex_app_server_protocol::ThreadSearchParams {
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
source_kinds: None,
archived: None,
search_term: search_term.to_string(),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let ThreadSearchResponse { data, .. } = to_response::<ThreadSearchResponse>(resp)?;
assert_eq!(data.len(), 1);
assert_eq!(data[0].thread.id, thread_id);
assert_eq!(data[0].snippet, search_term);
Ok(())
}
#[tokio::test]
async fn thread_search_filters_by_source_kind() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
let cli_id = create_fake_rollout(
codex_home.path(),
"2025-02-01T10-00-00",
"2025-02-01T10:00:00Z",
"shared needle",
Some("mock_provider"),
/*git_info*/ None,
)?;
let exec_id = create_fake_rollout_with_source(
codex_home.path(),
"2025-02-01T11-00-00",
"2025-02-01T11:00:00Z",
"shared needle",
Some("mock_provider"),
/*git_info*/ None,
CoreSessionSource::Exec,
)?;
let mut mcp = init_mcp(codex_home.path()).await?;
let request_id = mcp
.send_thread_search_request(codex_app_server_protocol::ThreadSearchParams {
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
source_kinds: Some(vec![ThreadSourceKind::Exec]),
archived: None,
search_term: "needle".to_string(),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let ThreadSearchResponse { data, .. } = to_response::<ThreadSearchResponse>(resp)?;
let ids: Vec<_> = data
.iter()
.map(|result| result.thread.id.as_str())
.collect();
assert_eq!(ids, vec![exec_id.as_str()]);
assert_ne!(cli_id, exec_id);
Ok(())
}
#[tokio::test]
async fn thread_list_state_db_only_returns_sqlite_without_jsonl_repair() -> Result<()> {
let codex_home = TempDir::new()?;
+3
View File
@@ -9,6 +9,7 @@ pub(crate) mod list;
pub(crate) mod metadata;
pub(crate) mod policy;
pub(crate) mod recorder;
pub(crate) mod search;
pub(crate) mod session_index;
mod sqlite_metrics;
pub mod state_db;
@@ -60,6 +61,8 @@ pub use policy::should_persist_response_item_for_memories;
pub use recorder::RolloutRecorder;
pub use recorder::RolloutRecorderParams;
pub use recorder::append_rollout_item_to_path;
pub use search::first_rollout_content_match_snippet;
pub use search::search_rollout_paths;
pub use session_index::append_thread_name;
pub use session_index::find_thread_meta_by_name_str;
pub use session_index::find_thread_name_by_id;
+251
View File
@@ -0,0 +1,251 @@
use std::collections::HashSet;
use std::io;
use std::path::Path;
use std::path::PathBuf;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use tokio::io::AsyncBufReadExt;
use tokio::process::Command;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
const MATCH_CONTEXT_BEFORE_CHARS: usize = 48;
const MATCH_CONTEXT_AFTER_CHARS: usize = 96;
pub async fn search_rollout_paths(
rg_command: &Path,
codex_home: &Path,
archived: bool,
search_term: &str,
) -> io::Result<HashSet<PathBuf>> {
let root = codex_home.join(if archived {
ARCHIVED_SESSIONS_SUBDIR
} else {
SESSIONS_SUBDIR
});
let search_term = json_escaped_search_term(search_term)?;
ripgrep_rollout_paths(rg_command, root.as_path(), search_term.as_str()).await
}
async fn ripgrep_rollout_paths(
rg_command: &Path,
root: &Path,
search_term: &str,
) -> io::Result<HashSet<PathBuf>> {
if !tokio::fs::try_exists(root).await.unwrap_or(false) {
return Ok(HashSet::new());
}
let output = match Command::new(rg_command)
.arg("-l")
.arg("--fixed-strings")
.arg("--no-ignore")
.arg("--glob")
.arg("*.jsonl")
.arg("--")
.arg(search_term)
.arg(root)
.output()
.await
{
Ok(output) => output,
Err(err) if err.kind() == io::ErrorKind::NotFound => {
return scan_rollout_paths(root, search_term).await;
}
Err(err) => return Err(err),
};
if !output.status.success() {
if output.status.code() == Some(1) && output.stderr.is_empty() {
return Ok(HashSet::new());
}
return Err(io::Error::other(format!(
"ripgrep rollout search failed under {}",
root.display()
)));
}
let mut matches = HashSet::new();
for line in String::from_utf8_lossy(output.stdout.as_slice()).lines() {
let path = PathBuf::from(line);
let path = if path.is_absolute() {
path
} else {
root.join(path)
};
matches.insert(path);
}
Ok(matches)
}
async fn scan_rollout_paths(root: &Path, search_term: &str) -> io::Result<HashSet<PathBuf>> {
let mut matches = HashSet::new();
let mut dirs = vec![root.to_path_buf()];
while let Some(dir) = dirs.pop() {
let mut entries = match tokio::fs::read_dir(dir).await {
Ok(entries) => entries,
Err(err) if err.kind() == io::ErrorKind::NotFound => continue,
Err(err) => return Err(err),
};
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let file_type = entry.file_type().await?;
if file_type.is_dir() {
dirs.push(path);
continue;
}
if !file_type.is_file()
|| path.extension().and_then(|extension| extension.to_str()) != Some("jsonl")
{
continue;
}
if rollout_contains(path.as_path(), search_term).await? {
matches.insert(path);
}
}
}
Ok(matches)
}
async fn rollout_contains(path: &Path, search_term: &str) -> io::Result<bool> {
let file = tokio::fs::File::open(path).await?;
let mut lines = tokio::io::BufReader::new(file).lines();
while let Some(line) = lines.next_line().await? {
if line.contains(search_term) {
return Ok(true);
}
}
Ok(false)
}
pub async fn first_rollout_content_match_snippet(
path: &Path,
search_term: &str,
) -> io::Result<Option<String>> {
let file = tokio::fs::File::open(path).await?;
let mut lines = tokio::io::BufReader::new(file).lines();
let json_search_term = json_escaped_search_term(search_term)?;
while let Some(line) = lines.next_line().await? {
if line.contains(json_search_term.as_str())
&& let Some(snippet) = content_match_snippet(line.as_str(), search_term)
{
return Ok(Some(snippet));
}
}
Ok(None)
}
fn json_escaped_search_term(search_term: &str) -> io::Result<String> {
let serialized = serde_json::to_string(search_term).map_err(io::Error::other)?;
Ok(serialized[1..serialized.len() - 1].to_string())
}
fn content_match_snippet(jsonl_line: &str, search_term: &str) -> Option<String> {
let rollout_line = serde_json::from_str::<RolloutLine>(jsonl_line.trim()).ok()?;
let text = conversation_text_from_item(&rollout_line.item)?;
excerpt_around_match(text.as_str(), search_term)
}
fn conversation_text_from_item(item: &RolloutItem) -> Option<String> {
match item {
RolloutItem::EventMsg(EventMsg::UserMessage(user)) => {
let text = strip_user_message_prefix(user.message.as_str());
if text.is_empty() {
None
} else {
Some(text.to_string())
}
}
RolloutItem::EventMsg(EventMsg::AgentMessage(agent)) => {
if agent.message.trim().is_empty() {
None
} else {
Some(agent.message.trim().to_string())
}
}
RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) => {
let text = content
.iter()
.filter_map(content_item_text)
.collect::<Vec<_>>()
.join(" ");
if text.trim().is_empty() || (role != "user" && role != "assistant") {
None
} else {
Some(text)
}
}
RolloutItem::SessionMeta(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_) => None,
}
}
fn content_item_text(item: &ContentItem) -> Option<&str> {
match item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => Some(text.as_str()),
ContentItem::InputImage { .. } => None,
}
}
fn strip_user_message_prefix(text: &str) -> &str {
match text.find(USER_MESSAGE_BEGIN) {
Some(idx) => text[idx + USER_MESSAGE_BEGIN.len()..].trim(),
None => text.trim(),
}
}
fn excerpt_around_match(text: &str, search_term: &str) -> Option<String> {
let normalized = normalize_preview_text(text);
let match_start = normalized.find(search_term)?;
let match_end = match_start.saturating_add(search_term.len());
let excerpt_start =
char_start_before(normalized.as_str(), match_start, MATCH_CONTEXT_BEFORE_CHARS);
let excerpt_end = char_end_after(normalized.as_str(), match_end, MATCH_CONTEXT_AFTER_CHARS);
let excerpt = normalized[excerpt_start..excerpt_end].trim();
if excerpt.is_empty() {
return None;
}
let mut snippet = String::new();
if excerpt_start > 0 {
snippet.push_str("... ");
}
snippet.push_str(excerpt);
if excerpt_end < normalized.len() {
snippet.push_str(" ...");
}
Some(snippet)
}
fn normalize_preview_text(text: &str) -> String {
text.split_whitespace().collect::<Vec<_>>().join(" ")
}
fn char_start_before(text: &str, byte_index: usize, chars_before: usize) -> usize {
text[..byte_index]
.char_indices()
.rev()
.nth(chars_before)
.map(|(idx, _)| idx)
.unwrap_or(0)
}
fn char_end_after(text: &str, byte_index: usize, chars_after: usize) -> usize {
text[byte_index..]
.char_indices()
.nth(chars_after)
.map(|(offset, _)| byte_index.saturating_add(offset))
.unwrap_or(text.len())
}
+1
View File
@@ -16,6 +16,7 @@ workspace = true
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
codex-git-utils = { workspace = true }
codex-install-context = { workspace = true }
codex-protocol = { workspace = true }
codex-rollout = { workspace = true }
codex-state = { workspace = true }
+3
View File
@@ -34,9 +34,11 @@ pub use types::LoadThreadHistoryParams;
pub use types::ReadThreadByRolloutPathParams;
pub use types::ReadThreadParams;
pub use types::ResumeThreadParams;
pub use types::SearchThreadsParams;
pub use types::SortDirection;
pub use types::StoredThread;
pub use types::StoredThreadHistory;
pub use types::StoredThreadSearchResult;
pub use types::StoredTurn;
pub use types::StoredTurnError;
pub use types::StoredTurnItemsView;
@@ -45,6 +47,7 @@ pub use types::ThreadEventPersistenceMode;
pub use types::ThreadMetadataPatch;
pub use types::ThreadPage;
pub use types::ThreadPersistenceMetadata;
pub use types::ThreadSearchPage;
pub use types::ThreadSortKey;
pub use types::TurnPage;
pub use types::UpdateThreadMetadataParams;
@@ -107,7 +107,7 @@ pub(super) async fn list_threads(
Ok(ThreadPage { items, next_cursor })
}
async fn list_rollout_threads(
pub(super) async fn list_rollout_threads(
state_db: Option<codex_rollout::StateDbHandle>,
config: &RolloutConfig,
default_model_provider_id: &str,
+10
View File
@@ -4,6 +4,7 @@ mod helpers;
mod list_threads;
mod live_writer;
mod read_thread;
mod search_threads;
mod unarchive_thread;
mod update_thread_metadata;
@@ -28,9 +29,11 @@ use crate::LoadThreadHistoryParams;
use crate::ReadThreadByRolloutPathParams;
use crate::ReadThreadParams;
use crate::ResumeThreadParams;
use crate::SearchThreadsParams;
use crate::StoredThread;
use crate::StoredThreadHistory;
use crate::ThreadPage;
use crate::ThreadSearchPage;
use crate::ThreadStore;
use crate::ThreadStoreError;
use crate::ThreadStoreResult;
@@ -261,6 +264,13 @@ impl ThreadStore for LocalThreadStore {
list_threads::list_threads(self, params).await
}
async fn search_threads(
&self,
params: SearchThreadsParams,
) -> ThreadStoreResult<ThreadSearchPage> {
search_threads::search_threads(self, params).await
}
async fn update_thread_metadata(
&self,
params: UpdateThreadMetadataParams,
@@ -0,0 +1,217 @@
use std::collections::HashMap;
use std::collections::HashSet;
use codex_install_context::InstallContext;
use codex_protocol::ThreadId;
use codex_rollout::RolloutConfig;
use codex_rollout::find_thread_names_by_ids;
use codex_rollout::first_rollout_content_match_snippet;
use codex_rollout::parse_cursor;
use codex_rollout::search_rollout_paths;
use super::LocalThreadStore;
use super::helpers::distinct_thread_metadata_title;
use super::helpers::set_thread_name_from_title;
use super::helpers::stored_thread_from_rollout_item;
use super::list_threads::list_rollout_threads;
use crate::ListThreadsParams;
use crate::SearchThreadsParams;
use crate::SortDirection;
use crate::StoredThreadSearchResult;
use crate::ThreadSearchPage;
use crate::ThreadSortKey;
use crate::ThreadStoreError;
use crate::ThreadStoreResult;
struct ThreadSearchItem {
item: codex_rollout::ThreadItem,
snippet: String,
}
pub(super) async fn search_threads(
store: &LocalThreadStore,
params: SearchThreadsParams,
) -> ThreadStoreResult<ThreadSearchPage> {
let search_term = params.search_term.as_str();
if search_term.is_empty() {
return Err(ThreadStoreError::InvalidRequest {
message: "thread/search requires search_term".to_string(),
});
}
let cursor = params
.cursor
.as_deref()
.map(|cursor| {
parse_cursor(cursor).ok_or_else(|| ThreadStoreError::InvalidRequest {
message: format!("invalid cursor: {cursor}"),
})
})
.transpose()?;
let sort_key = match params.sort_key {
ThreadSortKey::CreatedAt => codex_rollout::ThreadSortKey::CreatedAt,
ThreadSortKey::UpdatedAt => codex_rollout::ThreadSortKey::UpdatedAt,
};
let sort_direction = match params.sort_direction {
SortDirection::Asc => codex_rollout::SortDirection::Asc,
SortDirection::Desc => codex_rollout::SortDirection::Desc,
};
let state_db = store.state_db().await;
let rollout_config = RolloutConfig {
codex_home: store.config.codex_home.clone(),
sqlite_home: store.config.sqlite_home.clone(),
cwd: store.config.codex_home.clone(),
model_provider_id: store.config.default_model_provider_id.clone(),
generate_memories: false,
};
let rg_command = InstallContext::current().rg_command();
let matching_paths = search_rollout_paths(
rg_command.as_path(),
store.config.codex_home.as_path(),
params.archived,
search_term,
)
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to search rollout contents: {err}"),
})?;
if matching_paths.is_empty() {
return Ok(ThreadSearchPage {
items: Vec::new(),
next_cursor: None,
});
}
let mut matching_items = Vec::new();
let mut page_cursor = cursor;
let scan_page_size = params.page_size.saturating_mul(8).clamp(256, 2048);
let scan_params = ListThreadsParams {
page_size: scan_page_size,
cursor: None,
sort_key: params.sort_key,
sort_direction: params.sort_direction,
allowed_sources: params.allowed_sources.clone(),
model_providers: None,
cwd_filters: None,
archived: params.archived,
search_term: None,
use_state_db_only: state_db.is_some(),
};
let mut remaining_paths = matching_paths;
loop {
let page = list_rollout_threads(
state_db.clone(),
&rollout_config,
store.config.default_model_provider_id.as_str(),
&scan_params,
page_cursor.as_ref(),
sort_key,
sort_direction,
)
.await?;
for item in page.items {
if !remaining_paths.remove(item.path.as_path()) {
continue;
}
let Some(snippet) =
first_rollout_content_match_snippet(item.path.as_path(), search_term)
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to read rollout search match: {err}"),
})?
else {
continue;
};
matching_items.push(ThreadSearchItem { item, snippet });
if matching_items.len() > params.page_size {
break;
}
}
page_cursor = page.next_cursor;
if matching_items.len() > params.page_size
|| remaining_paths.is_empty()
|| page_cursor.is_none()
{
break;
}
}
let more_matches_available = matching_items.len() > params.page_size;
matching_items.truncate(params.page_size);
let next_cursor = if more_matches_available {
matching_items
.last()
.and_then(|item| cursor_from_thread_search_item(item, params.sort_key))
} else {
None
}
.as_ref()
.and_then(|cursor| serde_json::to_value(cursor).ok())
.and_then(|value| value.as_str().map(str::to_owned));
let mut items = matching_items
.into_iter()
.filter_map(|item| {
stored_thread_from_rollout_item(
item.item,
params.archived,
store.config.default_model_provider_id.as_str(),
)
.map(|thread| StoredThreadSearchResult {
thread,
snippet: item.snippet,
})
})
.collect::<Vec<_>>();
set_thread_search_result_names(store, &mut items).await;
Ok(ThreadSearchPage { items, next_cursor })
}
fn cursor_from_thread_search_item(
item: &ThreadSearchItem,
sort_key: ThreadSortKey,
) -> Option<codex_rollout::Cursor> {
let timestamp = match sort_key {
ThreadSortKey::CreatedAt => item.item.created_at.as_deref()?,
ThreadSortKey::UpdatedAt => item
.item
.updated_at
.as_deref()
.or(item.item.created_at.as_deref())?,
};
parse_cursor(timestamp)
}
async fn set_thread_search_result_names(
store: &LocalThreadStore,
items: &mut [StoredThreadSearchResult],
) {
let thread_ids = items
.iter()
.map(|item| item.thread.thread_id)
.collect::<HashSet<_>>();
let mut names = HashMap::<ThreadId, String>::with_capacity(thread_ids.len());
if let Some(state_db_ctx) = store.state_db().await {
for &thread_id in &thread_ids {
let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else {
continue;
};
if let Some(title) = distinct_thread_metadata_title(&metadata) {
names.insert(thread_id, title);
}
}
}
if names.len() < thread_ids.len()
&& let Ok(legacy_names) =
find_thread_names_by_ids(store.config.codex_home.as_path(), &thread_ids).await
{
for (thread_id, title) in legacy_names {
names.entry(thread_id).or_insert(title);
}
}
for item in items {
if let Some(title) = names.get(&item.thread.thread_id).cloned() {
set_thread_name_from_title(&mut item.thread, title);
}
}
}
+12
View File
@@ -13,9 +13,11 @@ use crate::LoadThreadHistoryParams;
use crate::ReadThreadByRolloutPathParams;
use crate::ReadThreadParams;
use crate::ResumeThreadParams;
use crate::SearchThreadsParams;
use crate::StoredThread;
use crate::StoredThreadHistory;
use crate::ThreadPage;
use crate::ThreadSearchPage;
use crate::ThreadStoreError;
use crate::ThreadStoreResult;
use crate::TurnPage;
@@ -76,6 +78,16 @@ pub trait ThreadStore: Any + Send + Sync {
/// Lists stored threads matching the supplied filters.
async fn list_threads(&self, params: ListThreadsParams) -> ThreadStoreResult<ThreadPage>;
/// Searches stored threads and returns search-only preview metadata.
async fn search_threads(
&self,
_params: SearchThreadsParams,
) -> ThreadStoreResult<ThreadSearchPage> {
Err(ThreadStoreError::Unsupported {
operation: "thread/search",
})
}
/// Lists turns within a stored thread.
async fn list_turns(&self, _params: ListTurnsParams) -> ThreadStoreResult<TurnPage> {
Err(ThreadStoreError::Unsupported {
+34
View File
@@ -199,6 +199,25 @@ pub struct ListThreadsParams {
pub use_state_db_only: bool,
}
/// Parameters for searching thread content.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SearchThreadsParams {
/// Maximum number of threads to return.
pub page_size: usize,
/// Opaque cursor returned by a previous search call.
pub cursor: Option<String>,
/// Sort order requested by the caller.
pub sort_key: ThreadSortKey,
/// Sort direction requested by the caller.
pub sort_direction: SortDirection,
/// Allowed session sources. Empty means implementation default.
pub allowed_sources: Vec<SessionSource>,
/// Whether archived threads should be searched instead of active threads.
pub archived: bool,
/// Visible thread content to search for.
pub search_term: String,
}
/// A page of stored thread records.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ThreadPage {
@@ -208,6 +227,21 @@ pub struct ThreadPage {
pub next_cursor: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StoredThreadSearchResult {
pub thread: StoredThread,
pub snippet: String,
}
/// A page of thread-search results.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ThreadSearchPage {
/// Search results returned for this page.
pub items: Vec<StoredThreadSearchResult>,
/// Opaque cursor to continue searching.
pub next_cursor: Option<String>,
}
/// Requested amount of item detail for stored turns.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub enum StoredTurnItemsView {