Support multiple cwd filters for thread list (#18502)

## Summary

- Teach app-server `thread/list` to accept either a single `cwd` or an
array of cwd filters, returning threads whose recorded session cwd
matches any requested path
- Add `useStateDbOnly` as an explicit opt-in fast path for callers that
want to answer `thread/list` from SQLite without scanning JSONL rollout
files
- Preserve backwards compatibility: by default, `thread/list` still
scans JSONL rollouts and repairs SQLite state
- Wire the new cwd array and SQLite-only options through app-server,
local/remote thread-store, rollout listing, generated TypeScript/schema
fixtures, proto output, and docs

## Test Plan

- `cargo test -p codex-app-server-protocol`
- `cargo test -p codex-rollout`
- `cargo test -p codex-thread-store`
- `cargo test -p codex-app-server thread_list`
- `just fmt`
- `just fix -p codex-app-server-protocol -p codex-rollout -p
codex-thread-store -p codex-app-server`
- `cargo build -p codex-cli --bin codex`
This commit is contained in:
acrognale-oai
2026-04-22 06:10:09 -04:00
committed by GitHub
Unverified
parent b04ffeee4c
commit 4f8c58f737
32 changed files with 1183 additions and 133 deletions
+26 -5
View File
@@ -3082,6 +3082,19 @@
],
"type": "object"
},
"ThreadListCwdFilter": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"type": "string"
},
"type": "array"
}
]
},
"ThreadListParams": {
"properties": {
"archived": {
@@ -3099,11 +3112,15 @@
]
},
"cwd": {
"description": "Optional cwd filter; when set, only threads whose session cwd exactly matches this path are returned.",
"type": [
"string",
"null"
]
"anyOf": [
{
"$ref": "#/definitions/ThreadListCwdFilter"
},
{
"type": "null"
}
],
"description": "Optional cwd filter or filters; when set, only threads whose session cwd exactly matches one of these paths are returned."
},
"limit": {
"description": "Optional page size; defaults to a reasonable server-side value.",
@@ -3162,6 +3179,10 @@
"array",
"null"
]
},
"useStateDbOnly": {
"description": "If true, return from the state DB without scanning JSONL rollouts to repair thread metadata. Omitted or false preserves scan-and-repair behavior.",
"type": "boolean"
}
},
"type": "object"
@@ -14868,6 +14868,19 @@
}
]
},
"ThreadListCwdFilter": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"type": "string"
},
"type": "array"
}
]
},
"ThreadListParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
@@ -14886,11 +14899,15 @@
]
},
"cwd": {
"description": "Optional cwd filter; when set, only threads whose session cwd exactly matches this path are returned.",
"type": [
"string",
"null"
]
"anyOf": [
{
"$ref": "#/definitions/v2/ThreadListCwdFilter"
},
{
"type": "null"
}
],
"description": "Optional cwd filter or filters; when set, only threads whose session cwd exactly matches one of these paths are returned."
},
"limit": {
"description": "Optional page size; defaults to a reasonable server-side value.",
@@ -14949,6 +14966,10 @@
"array",
"null"
]
},
"useStateDbOnly": {
"description": "If true, return from the state DB without scanning JSONL rollouts to repair thread metadata. Omitted or false preserves scan-and-repair behavior.",
"type": "boolean"
}
},
"title": "ThreadListParams",
@@ -12762,6 +12762,19 @@
}
]
},
"ThreadListCwdFilter": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"type": "string"
},
"type": "array"
}
]
},
"ThreadListParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
@@ -12780,11 +12793,15 @@
]
},
"cwd": {
"description": "Optional cwd filter; when set, only threads whose session cwd exactly matches this path are returned.",
"type": [
"string",
"null"
]
"anyOf": [
{
"$ref": "#/definitions/ThreadListCwdFilter"
},
{
"type": "null"
}
],
"description": "Optional cwd filter or filters; when set, only threads whose session cwd exactly matches one of these paths are returned."
},
"limit": {
"description": "Optional page size; defaults to a reasonable server-side value.",
@@ -12843,6 +12860,10 @@
"array",
"null"
]
},
"useStateDbOnly": {
"description": "If true, return from the state DB without scanning JSONL rollouts to repair thread metadata. Omitted or false preserves scan-and-repair behavior.",
"type": "boolean"
}
},
"title": "ThreadListParams",
@@ -8,6 +8,19 @@
],
"type": "string"
},
"ThreadListCwdFilter": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"type": "string"
},
"type": "array"
}
]
},
"ThreadSortKey": {
"enum": [
"created_at",
@@ -47,11 +60,15 @@
]
},
"cwd": {
"description": "Optional cwd filter; when set, only threads whose session cwd exactly matches this path are returned.",
"type": [
"string",
"null"
]
"anyOf": [
{
"$ref": "#/definitions/ThreadListCwdFilter"
},
{
"type": "null"
}
],
"description": "Optional cwd filter or filters; when set, only threads whose session cwd exactly matches one of these paths are returned."
},
"limit": {
"description": "Optional page size; defaults to a reasonable server-side value.",
@@ -110,6 +127,10 @@
"array",
"null"
]
},
"useStateDbOnly": {
"description": "If true, return from the state DB without scanning JSONL rollouts to repair thread metadata. Omitted or false preserves scan-and-repair behavior.",
"type": "boolean"
}
},
"title": "ThreadListParams",
@@ -38,10 +38,16 @@ sourceKinds?: Array<ThreadSourceKind> | null,
*/
archived?: boolean | null,
/**
* Optional cwd filter; when set, only threads whose session cwd exactly
* matches this path are returned.
* Optional cwd filter or filters; when set, only threads whose session cwd
* exactly matches one of these paths are returned.
*/
cwd?: string | null,
cwd?: string | Array<string> | null,
/**
* If true, return from the state DB without scanning JSONL rollouts to
* repair thread metadata. Omitted or false preserves scan-and-repair
* behavior.
*/
useStateDbOnly?: boolean,
/**
* Optional substring filter for the extracted thread title.
*/
@@ -3703,15 +3703,27 @@ pub struct ThreadListParams {
/// If false or null, only non-archived threads are returned.
#[ts(optional = nullable)]
pub archived: Option<bool>,
/// Optional cwd filter; when set, only threads whose session cwd exactly
/// matches this path are returned.
#[ts(optional = nullable)]
pub cwd: Option<String>,
/// Optional cwd filter or filters; when set, only threads whose session cwd
/// exactly matches one of these paths are returned.
#[ts(optional = nullable, type = "string | Array<string> | null")]
pub cwd: Option<ThreadListCwdFilter>,
/// If true, return from the state DB without scanning JSONL rollouts to
/// repair thread metadata. Omitted or false preserves scan-and-repair
/// behavior.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub use_state_db_only: bool,
/// Optional substring filter for the extracted thread title.
#[ts(optional = nullable)]
pub search_term: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)]
#[serde(untagged)]
pub enum ThreadListCwdFilter {
One(String),
Many(Vec<String>),
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase", export_to = "v2/")]
@@ -7294,6 +7306,46 @@ mod tests {
absolute_path("readable")
}
#[test]
fn thread_list_params_accepts_single_cwd() {
let params = serde_json::from_value::<ThreadListParams>(json!({
"cwd": "/workspace",
}))
.expect("single cwd should deserialize");
assert_eq!(
params.cwd,
Some(ThreadListCwdFilter::One("/workspace".to_string()))
);
assert!(!params.use_state_db_only);
}
#[test]
fn thread_list_params_accepts_multiple_cwds() {
let params = serde_json::from_value::<ThreadListParams>(json!({
"cwd": ["/workspace", "/other-workspace"],
}))
.expect("cwd array should deserialize");
assert_eq!(
params.cwd,
Some(ThreadListCwdFilter::Many(vec![
"/workspace".to_string(),
"/other-workspace".to_string(),
]))
);
}
#[test]
fn thread_list_params_accepts_state_db_only_flag() {
let params = serde_json::from_value::<ThreadListParams>(json!({
"useStateDbOnly": true,
}))
.expect("state db only flag should deserialize");
assert!(params.use_state_db_only);
}
#[test]
fn collab_agent_state_maps_interrupted_status() {
assert_eq!(
@@ -1129,6 +1129,7 @@ async fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u3
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
})?;
println!("< thread/list response: {response:?}");
+3 -1
View File
@@ -290,7 +290,8 @@ Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `per
- `modelProviders` — restrict results to specific providers; unset, null, or an empty array will include all providers.
- `sourceKinds` — restrict results to specific sources; omit or pass `[]` for interactive sessions only (`cli`, `vscode`).
- `archived` — when `true`, list archived threads only. When `false` or `null`, list non-archived threads (default).
- `cwd` — restrict results to threads whose session cwd exactly matches this path. Relative paths are resolved against the app-server process cwd before matching.
- `cwd` — restrict results to threads whose session cwd exactly matches this path, or one of these paths when an array is provided. Relative paths are resolved against the app-server process cwd before matching.
- `useStateDbOnly` — when `true`, return from the state DB without scanning JSONL rollouts to repair metadata. Omit or pass `false` to preserve the default scan-and-repair behavior.
- `searchTerm` — restrict results to threads whose extracted title contains this substring (case-sensitive).
- Responses include `nextCursor` to continue in the same direction and `backwardsCursor` to pass as `cursor` when reversing `sortDirection`.
- Responses include `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available.
@@ -301,6 +302,7 @@ Example:
{ "method": "thread/list", "id": 20, "params": {
"cursor": null,
"limit": 25,
"cwd": ["/Users/me/project", "/Users/me/project-worktree"],
"sortKey": "created_at"
} }
{ "id": 20, "result": {
@@ -150,6 +150,7 @@ use codex_app_server_protocol::ThreadIncrementElicitationResponse;
use codex_app_server_protocol::ThreadInjectItemsParams;
use codex_app_server_protocol::ThreadInjectItemsResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListCwdFilter;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadLoadedListParams;
@@ -402,8 +403,9 @@ struct ThreadListFilters {
model_providers: Option<Vec<String>>,
source_kinds: Option<Vec<ThreadSourceKind>>,
archived: bool,
cwd: Option<PathBuf>,
cwd_filters: Option<Vec<PathBuf>>,
search_term: Option<String>,
use_state_db_only: bool,
}
// Duration before a browser ChatGPT login attempt is abandoned.
@@ -3722,10 +3724,11 @@ impl CodexMessageProcessor {
source_kinds,
archived,
cwd,
use_state_db_only,
search_term,
} = params;
let cwd = match normalize_thread_list_cwd_filter(cwd) {
Ok(cwd) => cwd,
let cwd_filters = match normalize_thread_list_cwd_filters(cwd) {
Ok(cwd_filters) => cwd_filters,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
@@ -3751,8 +3754,9 @@ impl CodexMessageProcessor {
model_providers,
source_kinds,
archived: archived.unwrap_or(false),
cwd,
cwd_filters,
search_term,
use_state_db_only,
},
)
.await;
@@ -5217,8 +5221,9 @@ impl CodexMessageProcessor {
model_providers,
source_kinds,
archived,
cwd,
cwd_filters,
search_term,
use_state_db_only,
} = filters;
let mut cursor_obj = cursor;
let mut last_cursor = cursor_obj.clone();
@@ -5255,28 +5260,27 @@ impl CodexMessageProcessor {
sort_direction: store_sort_direction,
allowed_sources: allowed_sources.to_vec(),
model_providers: model_provider_filter.clone(),
cwd_filters: cwd_filters.clone(),
archived,
search_term: search_term.clone(),
use_state_db_only,
})
.await
.map_err(thread_store_list_error)?;
let mut candidate_summaries = Vec::with_capacity(page.items.len());
let mut filtered = Vec::with_capacity(page.items.len());
for it in page.items {
let Some(summary) = summary_from_stored_thread(it, fallback_provider.as_str())
else {
continue;
};
candidate_summaries.push(summary);
}
let mut filtered = Vec::with_capacity(candidate_summaries.len());
for summary in candidate_summaries {
if source_kind_filter
.as_ref()
.is_none_or(|filter| source_kind_matches(&summary.source, filter))
&& cwd.as_ref().is_none_or(|expected_cwd| {
path_utils::paths_match_after_normalization(&summary.cwd, expected_cwd)
&& cwd_filters.as_ref().is_none_or(|expected_cwds| {
expected_cwds.iter().any(|expected_cwd| {
path_utils::paths_match_after_normalization(&summary.cwd, expected_cwd)
})
})
{
filtered.push(summary);
@@ -5288,25 +5292,22 @@ impl CodexMessageProcessor {
items.extend(filtered);
remaining = requested_page_size.saturating_sub(items.len());
let next_cursor_value = page.next_cursor.clone();
next_cursor = next_cursor_value.clone();
next_cursor = page.next_cursor;
if remaining == 0 {
break;
}
match next_cursor_value {
Some(cursor_val) if remaining > 0 => {
// Break if our pagination would reuse the same cursor again; this avoids
// an infinite loop when filtering drops everything on the page.
if last_cursor.as_ref() == Some(&cursor_val) {
next_cursor = None;
break;
}
last_cursor = Some(cursor_val.clone());
cursor_obj = Some(cursor_val);
}
_ => break,
let Some(cursor_val) = next_cursor.clone() else {
break;
};
// Break if our pagination would reuse the same cursor again; this avoids
// an infinite loop when filtering drops everything on the page.
if last_cursor.as_ref() == Some(&cursor_val) {
next_cursor = None;
break;
}
last_cursor = Some(cursor_val.clone());
cursor_obj = Some(cursor_val);
}
Ok((items, next_cursor))
@@ -8273,25 +8274,36 @@ impl CodexMessageProcessor {
}
}
fn normalize_thread_list_cwd_filter(
cwd: Option<String>,
) -> Result<Option<PathBuf>, JSONRPCErrorError> {
fn normalize_thread_list_cwd_filters(
cwd: Option<ThreadListCwdFilter>,
) -> Result<Option<Vec<PathBuf>>, JSONRPCErrorError> {
let Some(cwd) = cwd else {
return Ok(None);
};
AbsolutePathBuf::relative_to_current_dir(cwd.as_str())
.map(AbsolutePathBuf::into_path_buf)
.map(Some)
.map_err(|err| JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
message: format!("invalid thread/list cwd filter `{cwd}`: {err}"),
data: None,
})
let cwds = match cwd {
ThreadListCwdFilter::One(cwd) => vec![cwd],
ThreadListCwdFilter::Many(cwds) => cwds,
};
let mut normalized_cwds = Vec::with_capacity(cwds.len());
for cwd in cwds {
let cwd = AbsolutePathBuf::relative_to_current_dir(cwd.as_str())
.map(AbsolutePathBuf::into_path_buf)
.map_err(|err| JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
message: format!("invalid thread/list cwd filter `{cwd}`: {err}"),
data: None,
})?;
normalized_cwds.push(cwd);
}
Ok(Some(normalized_cwds))
}
#[cfg(test)]
mod thread_list_cwd_filter_tests {
use super::normalize_thread_list_cwd_filter;
use super::normalize_thread_list_cwd_filters;
use codex_app_server_protocol::ThreadListCwdFilter;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
@@ -8305,8 +8317,9 @@ mod thread_list_cwd_filter_tests {
};
assert_eq!(
normalize_thread_list_cwd_filter(Some(cwd.clone())).expect("cwd filter should parse"),
Some(PathBuf::from(cwd))
normalize_thread_list_cwd_filters(Some(ThreadListCwdFilter::One(cwd.clone())))
.expect("cwd filter should parse"),
Some(vec![PathBuf::from(cwd)])
);
}
@@ -8316,9 +8329,11 @@ mod thread_list_cwd_filter_tests {
let expected = AbsolutePathBuf::relative_to_current_dir("repo-b")?.to_path_buf();
assert_eq!(
normalize_thread_list_cwd_filter(Some(String::from("repo-b")))
.expect("cwd filter should parse"),
Some(expected)
normalize_thread_list_cwd_filters(Some(ThreadListCwdFilter::Many(vec![String::from(
"repo-b"
),])))
.expect("cwd filter should parse"),
Some(vec![expected])
);
Ok(())
}
@@ -548,6 +548,7 @@ async fn thread_fork_ephemeral_remains_pathless_and_omits_listing() -> Result<()
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
})
.await?;
+169 -12
View File
@@ -15,6 +15,7 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::SortDirection;
use codex_app_server_protocol::ThreadListCwdFilter;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
@@ -90,6 +91,7 @@ async fn list_threads_with_sort(
source_kinds,
archived,
cwd: None,
use_state_db_only: false,
search_term: None,
})
.await?;
@@ -468,15 +470,23 @@ async fn thread_list_respects_provider_filter() -> Result<()> {
}
#[tokio::test]
async fn thread_list_respects_cwd_filter() -> Result<()> {
async fn thread_list_respects_cwd_filters() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
let filtered_id = create_fake_rollout(
let first_filtered_id = create_fake_rollout(
codex_home.path(),
"2025-01-02T10-00-00",
"2025-01-02T10:00:00Z",
"filtered",
"first filtered",
Some("mock_provider"),
/*git_info*/ None,
)?;
let second_filtered_id = create_fake_rollout(
codex_home.path(),
"2025-01-02T12-00-00",
"2025-01-02T12:00:00Z",
"second filtered",
Some("mock_provider"),
/*git_info*/ None,
)?;
@@ -489,11 +499,22 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
/*git_info*/ None,
)?;
let target_cwd = codex_home.path().join("target-cwd");
fs::create_dir_all(&target_cwd)?;
let first_target_cwd = codex_home.path().join("first-target-cwd");
let second_target_cwd = codex_home.path().join("second-target-cwd");
fs::create_dir_all(&first_target_cwd)?;
fs::create_dir_all(&second_target_cwd)?;
set_rollout_cwd(
rollout_path(codex_home.path(), "2025-01-02T10-00-00", &filtered_id).as_path(),
&target_cwd,
rollout_path(codex_home.path(), "2025-01-02T10-00-00", &first_filtered_id).as_path(),
&first_target_cwd,
)?;
set_rollout_cwd(
rollout_path(
codex_home.path(),
"2025-01-02T12-00-00",
&second_filtered_id,
)
.as_path(),
&second_target_cwd,
)?;
let mut mcp = init_mcp(codex_home.path()).await?;
@@ -506,7 +527,11 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
cwd: Some(target_cwd.to_string_lossy().into_owned()),
cwd: Some(ThreadListCwdFilter::Many(vec![
first_target_cwd.to_string_lossy().into_owned(),
second_target_cwd.to_string_lossy().into_owned(),
])),
use_state_db_only: false,
search_term: None,
})
.await?;
@@ -520,10 +545,14 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
} = to_response::<ThreadListResponse>(resp)?;
assert_eq!(next_cursor, None);
assert_eq!(data.len(), 1);
assert_eq!(data[0].id, filtered_id);
assert_ne!(data[0].id, unfiltered_id);
assert_eq!(data[0].cwd.as_path(), target_cwd.as_path());
let filtered_ids: Vec<_> = data.iter().map(|thread| thread.id.as_str()).collect();
assert_eq!(
filtered_ids,
vec![second_filtered_id.as_str(), first_filtered_id.as_str()]
);
assert!(!filtered_ids.contains(&unfiltered_id.as_str()));
assert_eq!(data[0].cwd.as_path(), second_target_cwd.as_path());
assert_eq!(data[1].cwd.as_path(), first_target_cwd.as_path());
Ok(())
}
@@ -592,6 +621,7 @@ sqlite = true
codex_core::SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ None,
"mock_provider",
/*search_term*/ None,
)
@@ -609,6 +639,7 @@ sqlite = true
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: Some("needle".to_string()),
})
.await?;
@@ -628,6 +659,129 @@ sqlite = true
Ok(())
}
#[tokio::test]
async fn thread_list_state_db_only_returns_sqlite_without_jsonl_repair() -> Result<()> {
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join("config.toml"),
r#"
model = "mock-model"
approval_policy = "never"
suppress_unstable_features_warning = true
[features]
sqlite = true
"#,
)?;
let thread_id = create_fake_rollout(
codex_home.path(),
"2025-01-02T10-00-00",
"2025-01-02T10:00:00Z",
"state db only should not see this before repair",
Some("mock_provider"),
/*git_info*/ None,
)?;
let state_db =
codex_state::StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into())
.await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
let mut mcp = init_mcp(codex_home.path()).await?;
let request_id = mcp
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let repaired_response = to_response::<ThreadListResponse>(resp)?;
let ids: Vec<_> = repaired_response
.data
.iter()
.map(|thread| thread.id.as_str())
.collect();
assert_eq!(ids, vec![thread_id.as_str()]);
let thread_uuid = ThreadId::from_string(&thread_id)?;
let stale_cwd = codex_home.path().join("stale-cwd");
let mut metadata = state_db
.get_thread(thread_uuid)
.await?
.expect("thread should be repaired into sqlite");
metadata.cwd = stale_cwd.clone();
state_db.upsert_thread(&metadata).await?;
let request_id = mcp
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
cwd: Some(ThreadListCwdFilter::One(
stale_cwd.to_string_lossy().into_owned(),
)),
use_state_db_only: true,
search_term: None,
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let state_db_only_response = to_response::<ThreadListResponse>(resp)?;
let ids: Vec<_> = state_db_only_response
.data
.iter()
.map(|thread| thread.id.as_str())
.collect();
assert_eq!(ids, vec![thread_id.as_str()]);
let request_id = mcp
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
cwd: Some(ThreadListCwdFilter::One(
stale_cwd.to_string_lossy().into_owned(),
)),
use_state_db_only: false,
search_term: None,
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let scanned_response = to_response::<ThreadListResponse>(resp)?;
assert_eq!(scanned_response.data.len(), 0);
Ok(())
}
#[tokio::test]
async fn thread_list_empty_source_kinds_defaults_to_interactive_only() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -1307,6 +1461,7 @@ async fn thread_list_backwards_cursor_can_seed_forward_delta_sync() -> Result<()
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
})
.await?;
@@ -1348,6 +1503,7 @@ async fn thread_list_backwards_cursor_can_seed_forward_delta_sync() -> Result<()
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
})
.await?;
@@ -1585,6 +1741,7 @@ async fn thread_list_invalid_cursor_returns_error() -> Result<()> {
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
})
.await?;
@@ -548,6 +548,7 @@ async fn thread_name_set_is_reflected_in_read_list_and_resume() -> Result<()> {
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
})
.await?;
@@ -82,8 +82,10 @@ async fn has_threads(store: &LocalThreadStore, archived: bool) -> io::Result<boo
sort_direction: codex_thread_store::SortDirection::Desc,
allowed_sources: Vec::new(),
model_providers: None,
cwd_filters: None,
archived,
search_term: None,
use_state_db_only: false,
})
.await
.map(|page| !page.items.is_empty())
+2
View File
@@ -136,8 +136,10 @@ async fn load_recent_threads(sess: &Session) -> Vec<StoredThread> {
sort_direction: SortDirection::Desc,
allowed_sources: Vec::new(),
model_providers: None,
cwd_filters: None,
archived: false,
search_term: None,
use_state_db_only: false,
})
.await
{
+1
View File
@@ -183,6 +183,7 @@ impl AppServerClient {
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
},
};
+2
View File
@@ -1256,6 +1256,7 @@ async fn resolve_resume_thread_id(
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),
cwd: None,
use_state_db_only: false,
search_term: None,
},
},
@@ -1319,6 +1320,7 @@ async fn resolve_resume_thread_id(
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),
cwd: None,
use_state_db_only: false,
search_term: Some(session_id.to_string()),
},
},
+58 -10
View File
@@ -1,6 +1,7 @@
#![allow(warnings, clippy::all)]
use async_trait::async_trait;
use codex_utils_path as path_utils;
use std::cmp::Reverse;
use std::ffi::OsStr;
use std::io;
@@ -126,6 +127,7 @@ pub enum ThreadListLayout {
pub struct ThreadListConfig<'a> {
pub allowed_sources: &'a [SessionSource],
pub model_providers: Option<&'a [String]>,
pub cwd_filters: Option<&'a [PathBuf]>,
pub default_provider: &'a str,
pub layout: ThreadListLayout,
}
@@ -207,6 +209,7 @@ struct FilesByCreatedAtVisitor<'a> {
more_matches_available: bool,
allowed_sources: &'a [SessionSource],
provider_matcher: Option<&'a ProviderMatcher<'a>>,
cwd_filters: Option<&'a [PathBuf]>,
}
#[async_trait]
@@ -237,6 +240,7 @@ impl<'a> RolloutFileVisitor for FilesByCreatedAtVisitor<'a> {
path,
self.allowed_sources,
self.provider_matcher,
self.cwd_filters,
updated_at,
)
.await
@@ -317,6 +321,7 @@ pub async fn get_threads(
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filters: Option<&[PathBuf]>,
default_provider: &str,
) -> io::Result<ThreadsPage> {
let root = codex_home.join(SESSIONS_SUBDIR);
@@ -328,6 +333,7 @@ pub async fn get_threads(
ThreadListConfig {
allowed_sources,
model_providers,
cwd_filters,
default_provider,
layout: ThreadListLayout::NestedByDate,
},
@@ -366,6 +372,7 @@ pub async fn get_threads_in_root(
sort_key,
config.allowed_sources,
provider_matcher.as_ref(),
config.cwd_filters,
)
.await?
}
@@ -377,6 +384,7 @@ pub async fn get_threads_in_root(
sort_key,
config.allowed_sources,
provider_matcher.as_ref(),
config.cwd_filters,
)
.await?
}
@@ -395,6 +403,7 @@ async fn traverse_directories_for_paths(
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filters: Option<&[PathBuf]>,
) -> io::Result<ThreadsPage> {
match sort_key {
ThreadSortKey::CreatedAt => {
@@ -404,6 +413,7 @@ async fn traverse_directories_for_paths(
anchor,
allowed_sources,
provider_matcher,
cwd_filters,
)
.await
}
@@ -414,6 +424,7 @@ async fn traverse_directories_for_paths(
anchor,
allowed_sources,
provider_matcher,
cwd_filters,
)
.await
}
@@ -427,15 +438,30 @@ async fn traverse_flat_paths(
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filters: Option<&[PathBuf]>,
) -> io::Result<ThreadsPage> {
match sort_key {
ThreadSortKey::CreatedAt => {
traverse_flat_paths_created(root, page_size, anchor, allowed_sources, provider_matcher)
.await
traverse_flat_paths_created(
root,
page_size,
anchor,
allowed_sources,
provider_matcher,
cwd_filters,
)
.await
}
ThreadSortKey::UpdatedAt => {
traverse_flat_paths_updated(root, page_size, anchor, allowed_sources, provider_matcher)
.await
traverse_flat_paths_updated(
root,
page_size,
anchor,
allowed_sources,
provider_matcher,
cwd_filters,
)
.await
}
}
}
@@ -452,6 +478,7 @@ async fn traverse_directories_for_paths_created(
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filters: Option<&[PathBuf]>,
) -> io::Result<ThreadsPage> {
let mut items: Vec<ThreadItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
@@ -463,6 +490,7 @@ async fn traverse_directories_for_paths_created(
more_matches_available,
allowed_sources,
provider_matcher,
cwd_filters,
};
walk_rollout_files(&root, &mut scanned_files, &mut visitor).await?;
more_matches_available = visitor.more_matches_available;
@@ -499,14 +527,14 @@ async fn traverse_directories_for_paths_updated(
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filters: Option<&[PathBuf]>,
) -> io::Result<ThreadsPage> {
let mut items: Vec<ThreadItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
let mut anchor_state = AnchorState::new(anchor);
let mut more_matches_available = false;
let candidates = collect_files_by_updated_at(&root, &mut scanned_files).await?;
let mut candidates = candidates;
let mut candidates = collect_files_by_updated_at(&root, &mut scanned_files).await?;
candidates.sort_by_key(|candidate| {
let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH);
(Reverse(ts), Reverse(candidate.id))
@@ -527,6 +555,7 @@ async fn traverse_directories_for_paths_updated(
candidate.path,
allowed_sources,
provider_matcher,
cwd_filters,
updated_at_fallback,
)
.await
@@ -559,6 +588,7 @@ async fn traverse_flat_paths_created(
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filters: Option<&[PathBuf]>,
) -> io::Result<ThreadsPage> {
let mut items: Vec<ThreadItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
@@ -578,8 +608,14 @@ async fn traverse_flat_paths_created(
.await
.unwrap_or(None)
.and_then(format_rfc3339);
if let Some(item) =
build_thread_item(path, allowed_sources, provider_matcher, updated_at).await
if let Some(item) = build_thread_item(
path,
allowed_sources,
provider_matcher,
cwd_filters,
updated_at,
)
.await
{
items.push(item);
}
@@ -609,14 +645,14 @@ async fn traverse_flat_paths_updated(
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filters: Option<&[PathBuf]>,
) -> io::Result<ThreadsPage> {
let mut items: Vec<ThreadItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
let mut anchor_state = AnchorState::new(anchor);
let mut more_matches_available = false;
let candidates = collect_flat_files_by_updated_at(&root, &mut scanned_files).await?;
let mut candidates = candidates;
let mut candidates = collect_flat_files_by_updated_at(&root, &mut scanned_files).await?;
candidates.sort_by_key(|candidate| {
let ts = candidate.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH);
(Reverse(ts), Reverse(candidate.id))
@@ -637,6 +673,7 @@ async fn traverse_flat_paths_updated(
candidate.path,
allowed_sources,
provider_matcher,
cwd_filters,
updated_at_fallback,
)
.await
@@ -698,6 +735,7 @@ async fn build_thread_item(
path: PathBuf,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
cwd_filters: Option<&[PathBuf]>,
updated_at: Option<String>,
) -> Option<ThreadItem> {
// Read head and detect message events; stop once meta + user are found.
@@ -717,6 +755,15 @@ async fn build_thread_item(
{
return None;
}
if let Some(cwd_filters) = cwd_filters
&& !summary.cwd.as_ref().is_some_and(|cwd| {
cwd_filters
.iter()
.any(|filter| path_utils::paths_match_after_normalization(cwd, filter))
})
{
return None;
}
// Apply filters: must have session meta and at least one user message event
if summary.saw_session_meta && summary.saw_user_event {
let HeadTailSummary {
@@ -768,6 +815,7 @@ pub async fn read_thread_item_from_rollout(path: PathBuf) -> Option<ThreadItem>
path,
&[],
/*provider_matcher*/ None,
/*cwd_filters*/ None,
/*updated_at*/ None,
)
.await
+223 -33
View File
@@ -214,6 +214,18 @@ fn sanitize_rollout_item_for_persistence(
}
}
#[derive(Clone, Copy)]
enum ThreadListArchiveFilter {
Active,
Archived,
}
#[derive(Clone, Copy)]
enum ThreadListRepairMode {
ScanAndRepair,
StateDbOnly,
}
impl RolloutRecorder {
/// List threads (rollout files) under the provided Codex home directory.
#[allow(clippy::too_many_arguments)]
@@ -225,6 +237,7 @@ impl RolloutRecorder {
sort_direction: SortDirection,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filters: Option<&[PathBuf]>,
default_provider: &str,
search_term: Option<&str>,
) -> std::io::Result<ThreadsPage> {
@@ -236,8 +249,40 @@ impl RolloutRecorder {
sort_direction,
allowed_sources,
model_providers,
cwd_filters,
default_provider,
/*archived*/ false,
ThreadListArchiveFilter::Active,
ThreadListRepairMode::ScanAndRepair,
search_term,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn list_threads_from_state_db(
config: &impl RolloutConfigView,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
sort_direction: SortDirection,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filters: Option<&[PathBuf]>,
default_provider: &str,
search_term: Option<&str>,
) -> std::io::Result<ThreadsPage> {
Self::list_threads_with_db_fallback(
config,
page_size,
cursor,
sort_key,
sort_direction,
allowed_sources,
model_providers,
cwd_filters,
default_provider,
ThreadListArchiveFilter::Active,
ThreadListRepairMode::StateDbOnly,
search_term,
)
.await
@@ -253,6 +298,7 @@ impl RolloutRecorder {
sort_direction: SortDirection,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filters: Option<&[PathBuf]>,
default_provider: &str,
search_term: Option<&str>,
) -> std::io::Result<ThreadsPage> {
@@ -264,8 +310,40 @@ impl RolloutRecorder {
sort_direction,
allowed_sources,
model_providers,
cwd_filters,
default_provider,
/*archived*/ true,
ThreadListArchiveFilter::Archived,
ThreadListRepairMode::ScanAndRepair,
search_term,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn list_archived_threads_from_state_db(
config: &impl RolloutConfigView,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
sort_direction: SortDirection,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filters: Option<&[PathBuf]>,
default_provider: &str,
search_term: Option<&str>,
) -> std::io::Result<ThreadsPage> {
Self::list_threads_with_db_fallback(
config,
page_size,
cursor,
sort_key,
sort_direction,
allowed_sources,
model_providers,
cwd_filters,
default_provider,
ThreadListArchiveFilter::Archived,
ThreadListRepairMode::StateDbOnly,
search_term,
)
.await
@@ -280,19 +358,24 @@ impl RolloutRecorder {
sort_direction: SortDirection,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filters: Option<&[PathBuf]>,
default_provider: &str,
archived: bool,
archive_filter: ThreadListArchiveFilter,
repair_mode: ThreadListRepairMode,
search_term: Option<&str>,
) -> std::io::Result<ThreadsPage> {
let codex_home = config.codex_home();
let state_db_ctx = state_db::get_state_db(config).await;
let archived = match archive_filter {
ThreadListArchiveFilter::Active => false,
ThreadListArchiveFilter::Archived => true,
};
if cwd_filters.is_some_and(<[std::path::PathBuf]>::is_empty) {
return Ok(ThreadsPage::default());
}
// Search is the SQLite-optimized path and assumes a DB marked backfill-complete is
// actually populated enough to answer the query. If unmigrated rollout files still exist
// on disk, the repair path below may or may not run and catch them depending on whether
// SQLite already has another matching search hit.
if search_term.is_some()
&& let Some(db_page) = state_db::list_threads_db(
if matches!(repair_mode, ThreadListRepairMode::StateDbOnly) {
return Ok(state_db::list_threads_db(
state_db_ctx.as_deref(),
codex_home,
page_size,
@@ -301,18 +384,22 @@ impl RolloutRecorder {
sort_direction,
allowed_sources,
model_providers,
cwd_filters,
archived,
search_term,
)
.await
&& (!db_page.items.is_empty() || cursor.is_some())
{
return Ok(db_page.into());
.map(Into::into)
.unwrap_or_default());
}
let listing_has_metadata_filters = !allowed_sources.is_empty()
|| model_providers.is_some()
|| cwd_filters.is_some()
|| search_term.is_some();
// Filesystem-first listing intentionally overfetches so we can repair stale/missing
// SQLite rollout paths before the final DB-backed page is returned.
let fs_page_size = page_size.saturating_mul(2).max(page_size);
// SQLite rows before returning the scan page for filtered listings or the DB page for
// unfiltered listings.
let fs_page = match sort_direction {
SortDirection::Asc => {
list_threads_from_files_asc(
@@ -322,6 +409,7 @@ impl RolloutRecorder {
sort_key,
allowed_sources,
model_providers,
cwd_filters,
default_provider,
archived,
search_term,
@@ -331,11 +419,12 @@ impl RolloutRecorder {
SortDirection::Desc => {
list_threads_from_files_desc(
codex_home,
fs_page_size,
page_size.saturating_mul(2),
cursor,
sort_key,
allowed_sources,
model_providers,
cwd_filters,
default_provider,
archived,
search_term,
@@ -347,24 +436,39 @@ impl RolloutRecorder {
if state_db_ctx.is_none() {
// Keep legacy behavior when SQLite is unavailable: return filesystem results
// at the requested page size.
return Ok(match sort_direction {
SortDirection::Asc => fs_page,
SortDirection::Desc => truncate_fs_page(fs_page, page_size, sort_key),
});
return Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
page_size,
sort_key,
));
}
// Warm the DB by repairing every filesystem hit before querying SQLite.
for item in &fs_page.items {
state_db::read_repair_rollout_path(
state_db_ctx.as_deref(),
item.thread_id,
Some(archived),
item.path.as_path(),
)
.await;
if listing_has_metadata_filters {
state_db::reconcile_rollout(
state_db_ctx.as_deref(),
item.path.as_path(),
default_provider,
/*builder*/ None,
&[],
Some(archived),
/*new_thread_memory_mode*/ None,
)
.await;
} else {
state_db::read_repair_rollout_path(
state_db_ctx.as_deref(),
item.thread_id,
Some(archived),
item.path.as_path(),
)
.await;
}
}
if let Some(db_page) = state_db::list_threads_db(
let db_page = state_db::list_threads_db(
state_db_ctx.as_deref(),
codex_home,
page_size,
@@ -373,20 +477,83 @@ impl RolloutRecorder {
sort_direction,
allowed_sources,
model_providers,
cwd_filters,
archived,
search_term,
)
.await
{
.await;
if let Some(db_page) = db_page {
if search_term.is_some() && (!db_page.items.is_empty() || cursor.is_some()) {
for item in &db_page.items {
state_db::reconcile_rollout(
state_db_ctx.as_deref(),
item.rollout_path.as_path(),
default_provider,
/*builder*/ None,
&[],
Some(archived),
/*new_thread_memory_mode*/ None,
)
.await;
}
if let Some(repaired_db_page) = state_db::list_threads_db(
state_db_ctx.as_deref(),
codex_home,
page_size,
cursor,
sort_key,
sort_direction,
allowed_sources,
model_providers,
cwd_filters,
archived,
search_term,
)
.await
{
return Ok(repaired_db_page.into());
}
return Ok(db_page.into());
}
if listing_has_metadata_filters {
for item in &db_page.items {
state_db::reconcile_rollout(
state_db_ctx.as_deref(),
item.rollout_path.as_path(),
default_provider,
/*builder*/ None,
&[],
Some(archived),
/*new_thread_memory_mode*/ None,
)
.await;
}
return Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
page_size,
sort_key,
));
}
return Ok(db_page.into());
}
if listing_has_metadata_filters {
return Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
page_size,
sort_key,
));
}
// If SQLite listing still fails, return the filesystem page rather than failing the list.
tracing::error!("Falling back on rollout system");
tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back");
Ok(match sort_direction {
SortDirection::Asc => fs_page,
SortDirection::Desc => truncate_fs_page(fs_page, page_size, sort_key),
})
Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
page_size,
sort_key,
))
}
/// Find the newest recorded thread path, optionally filtering to a matching cwd.
@@ -403,6 +570,7 @@ impl RolloutRecorder {
) -> std::io::Result<Option<PathBuf>> {
let codex_home = config.codex_home();
let state_db_ctx = state_db::get_state_db(config).await;
let cwd_filter = filter_cwd.map(Path::to_path_buf);
if state_db_ctx.is_some() {
let mut db_cursor = cursor.cloned();
loop {
@@ -415,6 +583,7 @@ impl RolloutRecorder {
SortDirection::Desc,
allowed_sources,
model_providers,
cwd_filter.as_ref().map(std::slice::from_ref),
/*archived*/ false,
/*search_term*/ None,
)
@@ -443,6 +612,7 @@ impl RolloutRecorder {
sort_key,
allowed_sources,
model_providers,
cwd_filter.as_ref().map(std::slice::from_ref),
default_provider,
)
.await?;
@@ -796,6 +966,18 @@ fn truncate_fs_page(
page
}
fn page_from_filesystem_scan(
page: ThreadsPage,
sort_direction: SortDirection,
page_size: usize,
sort_key: ThreadSortKey,
) -> ThreadsPage {
match sort_direction {
SortDirection::Asc => page,
SortDirection::Desc => truncate_fs_page(page, page_size, sort_key),
}
}
#[allow(clippy::too_many_arguments)]
async fn list_threads_from_files_desc(
codex_home: &Path,
@@ -804,6 +986,7 @@ async fn list_threads_from_files_desc(
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filters: Option<&[PathBuf]>,
default_provider: &str,
archived: bool,
search_term: Option<&str>,
@@ -823,6 +1006,7 @@ async fn list_threads_from_files_desc(
sort_key,
allowed_sources,
model_providers,
cwd_filters,
default_provider,
archived,
)
@@ -864,6 +1048,7 @@ async fn list_threads_from_files_desc(
sort_key,
allowed_sources,
model_providers,
cwd_filters,
default_provider,
archived,
)
@@ -878,6 +1063,7 @@ async fn list_threads_from_files_desc_unfiltered(
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filters: Option<&[PathBuf]>,
default_provider: &str,
archived: bool,
) -> std::io::Result<ThreadsPage> {
@@ -891,6 +1077,7 @@ async fn list_threads_from_files_desc_unfiltered(
ThreadListConfig {
allowed_sources,
model_providers,
cwd_filters,
default_provider,
layout: ThreadListLayout::Flat,
},
@@ -904,6 +1091,7 @@ async fn list_threads_from_files_desc_unfiltered(
sort_key,
allowed_sources,
model_providers,
cwd_filters,
default_provider,
)
.await
@@ -918,6 +1106,7 @@ async fn list_threads_from_files_asc(
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filters: Option<&[PathBuf]>,
default_provider: &str,
archived: bool,
search_term: Option<&str>,
@@ -935,6 +1124,7 @@ async fn list_threads_from_files_asc(
sort_key,
allowed_sources,
model_providers,
cwd_filters,
default_provider,
archived,
/*search_term*/ None,
+272
View File
@@ -375,6 +375,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ None,
default_provider.as_str(),
/*search_term*/ None,
)
@@ -391,6 +392,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ None,
default_provider.as_str(),
/*search_term*/ None,
)
@@ -449,6 +451,7 @@ async fn list_threads_db_enabled_drops_missing_rollout_paths() -> std::io::Resul
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ None,
default_provider.as_str(),
/*search_term*/ None,
)
@@ -512,6 +515,7 @@ async fn list_threads_db_enabled_repairs_stale_rollout_paths() -> std::io::Resul
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ None,
default_provider.as_str(),
/*search_term*/ None,
)
@@ -527,6 +531,274 @@ async fn list_threads_db_enabled_repairs_stale_rollout_paths() -> std::io::Resul
Ok(())
}
#[tokio::test]
async fn list_threads_state_db_only_skips_jsonl_repair_scan() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize");
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
let uuid = Uuid::from_u128(9012);
let ts = "2025-01-03T14-00-00";
let day_dir = home.path().join("sessions/2025/01/03");
fs::create_dir_all(&day_dir)?;
let path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl"));
let mut file = File::create(&path)?;
let meta = serde_json::json!({
"timestamp": ts,
"type": "session_meta",
"payload": {
"id": uuid,
"timestamp": ts,
"cwd": home.path().display().to_string(),
"originator": "test_originator",
"cli_version": "test_version",
"source": "cli",
"model_provider": "test-provider",
},
});
writeln!(file, "{meta}")?;
let user_event = serde_json::json!({
"timestamp": ts,
"type": "event_msg",
"payload": {
"type": "user_message",
"message": "Hello from user",
"kind": "plain",
},
});
writeln!(file, "{user_event}")?;
let cwd_filters = [home.path().to_path_buf()];
let state_db_only_page = RolloutRecorder::list_threads_from_state_db(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ Some(cwd_filters.as_slice()),
config.model_provider_id.as_str(),
/*search_term*/ None,
)
.await?;
assert_eq!(state_db_only_page.items.len(), 0);
let repaired_page = RolloutRecorder::list_threads(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ Some(cwd_filters.as_slice()),
config.model_provider_id.as_str(),
/*search_term*/ None,
)
.await?;
assert_eq!(repaired_page.items.len(), 1);
let repaired_state_db_only_page = RolloutRecorder::list_threads_from_state_db(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ Some(cwd_filters.as_slice()),
config.model_provider_id.as_str(),
/*search_term*/ None,
)
.await?;
assert_eq!(repaired_state_db_only_page.items.len(), 1);
Ok(())
}
#[tokio::test]
async fn list_threads_default_filter_returns_filesystem_scan_results() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let uuid = Uuid::from_u128(9013);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let real_path = write_session_file(home.path(), "2025-01-03T13-00-00", uuid)?;
let stale_cwd = home.path().join("stale-cwd");
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize");
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
let created_at = chrono::Utc
.with_ymd_and_hms(2025, 1, 3, 13, 0, 0)
.single()
.expect("valid datetime");
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
real_path,
created_at,
SessionSource::Cli,
);
builder.model_provider = Some(config.model_provider_id.clone());
builder.cwd = stale_cwd.clone();
let mut metadata = builder.build(config.model_provider_id.as_str());
metadata.first_user_message = Some("Hello from user".to_string());
runtime
.upsert_thread(&metadata)
.await
.expect("state db upsert should succeed");
let cwd_filters = [stale_cwd];
let state_db_only_page = RolloutRecorder::list_threads_from_state_db(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ Some(cwd_filters.as_slice()),
config.model_provider_id.as_str(),
/*search_term*/ None,
)
.await?;
assert_eq!(state_db_only_page.items.len(), 1);
let scanned_page = RolloutRecorder::list_threads(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ Some(cwd_filters.as_slice()),
config.model_provider_id.as_str(),
/*search_term*/ None,
)
.await?;
assert_eq!(scanned_page.items.len(), 0);
let repaired_state_db_only_page = RolloutRecorder::list_threads_from_state_db(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ Some(cwd_filters.as_slice()),
config.model_provider_id.as_str(),
/*search_term*/ None,
)
.await?;
assert_eq!(repaired_state_db_only_page.items.len(), 0);
Ok(())
}
#[tokio::test]
async fn list_threads_search_repairs_stale_state_db_hits_before_returning() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let uuid = Uuid::from_u128(9014);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let real_path = write_session_file(home.path(), "2025-01-03T15-00-00", uuid)?;
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize");
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
let created_at = chrono::Utc
.with_ymd_and_hms(2025, 1, 3, 15, 0, 0)
.single()
.expect("valid datetime");
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
real_path,
created_at,
SessionSource::Cli,
);
builder.model_provider = Some(config.model_provider_id.clone());
builder.cwd = home.path().to_path_buf();
let mut metadata = builder.build(config.model_provider_id.as_str());
metadata.title = "needle stale title".to_string();
metadata.first_user_message = Some("stale first user".to_string());
runtime
.upsert_thread(&metadata)
.await
.expect("state db upsert should succeed");
let stale_state_db_only_page = RolloutRecorder::list_threads_from_state_db(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ None,
config.model_provider_id.as_str(),
Some("needle"),
)
.await?;
assert_eq!(stale_state_db_only_page.items.len(), 1);
let scanned_page = RolloutRecorder::list_threads(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ None,
config.model_provider_id.as_str(),
Some("needle"),
)
.await?;
assert_eq!(scanned_page.items.len(), 0);
let repaired_state_db_only_page = RolloutRecorder::list_threads_from_state_db(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
/*cwd_filters*/ None,
config.model_provider_id.as_str(),
Some("needle"),
)
.await?;
assert_eq!(repaired_state_db_only_page.items.len(), 0);
Ok(())
}
#[tokio::test]
async fn resume_candidate_matches_cwd_reads_latest_turn_context() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
+8
View File
@@ -191,6 +191,7 @@ pub async fn list_threads_db(
sort_direction: SortDirection,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filters: Option<&[PathBuf]>,
archived: bool,
search_term: Option<&str>,
) -> Option<codex_state::ThreadsPage> {
@@ -213,6 +214,12 @@ pub async fn list_threads_db(
})
.collect();
let model_providers = model_providers.map(<[String]>::to_vec);
let normalized_cwd_filters = cwd_filters.map(|filters| {
filters
.iter()
.map(|cwd| normalize_cwd_for_state_db(cwd))
.collect::<Vec<_>>()
});
match ctx
.list_threads(
page_size,
@@ -220,6 +227,7 @@ pub async fn list_threads_db(
archived_only: archived,
allowed_sources: allowed_sources.as_slice(),
model_providers: model_providers.as_deref(),
cwd_filters: normalized_cwd_filters.as_deref(),
anchor: anchor.as_ref(),
sort_key: match sort_key {
ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt,
+18
View File
@@ -540,6 +540,7 @@ async fn test_list_conversations_latest_first() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -689,6 +690,7 @@ async fn test_pagination_cursor() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -756,6 +758,7 @@ async fn test_pagination_cursor() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -823,6 +826,7 @@ async fn test_pagination_cursor() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -877,6 +881,7 @@ async fn test_list_threads_scans_past_head_for_user_event() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -910,6 +915,7 @@ async fn test_get_thread_contents() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -1000,6 +1006,7 @@ async fn test_base_instructions_missing_in_meta_defaults_to_null() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -1043,6 +1050,7 @@ async fn test_base_instructions_present_in_meta_is_preserved() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -1101,6 +1109,7 @@ async fn test_created_at_sort_uses_file_mtime_for_updated_at() -> Result<()> {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await?;
@@ -1186,6 +1195,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
ThreadSortKey::UpdatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await?;
@@ -1247,6 +1257,7 @@ async fn test_timestamp_only_cursor_skips_same_second_filesystem_ties() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -1315,6 +1326,7 @@ async fn test_timestamp_only_cursor_skips_same_second_filesystem_ties() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -1363,6 +1375,7 @@ async fn test_source_filter_excludes_non_matching_sessions() {
ThreadSortKey::CreatedAt,
INTERACTIVE_SESSION_SOURCES.as_slice(),
Some(provider_filter.as_slice()),
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -1385,6 +1398,7 @@ async fn test_source_filter_excludes_non_matching_sessions() {
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
/*model_providers*/ None,
/*cwd_filters*/ None,
TEST_PROVIDER,
)
.await
@@ -1447,6 +1461,7 @@ async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<(
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
Some(openai_filter.as_slice()),
/*cwd_filters*/ None,
"openai",
)
.await?;
@@ -1467,6 +1482,7 @@ async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<(
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
Some(beta_filter.as_slice()),
/*cwd_filters*/ None,
"openai",
)
.await?;
@@ -1486,6 +1502,7 @@ async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<(
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
Some(unknown_filter.as_slice()),
/*cwd_filters*/ None,
"openai",
)
.await?;
@@ -1498,6 +1515,7 @@ async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<(
ThreadSortKey::CreatedAt,
NO_SOURCE_FILTER,
/*model_providers*/ None,
/*cwd_filters*/ None,
"openai",
)
.await?;
@@ -0,0 +1,2 @@
CREATE INDEX idx_threads_archived_cwd_created_at_ms ON threads(archived, cwd, created_at_ms DESC, id DESC);
CREATE INDEX idx_threads_archived_cwd_updated_at_ms ON threads(archived, cwd, updated_at_ms DESC, id DESC);
+1
View File
@@ -175,6 +175,7 @@ LEFT JOIN jobs
archived_only: false,
allowed_sources,
model_providers: None,
cwd_filters: None,
anchor: None,
sort_key: SortKey::UpdatedAt,
sort_direction: SortDirection::Desc,
+91
View File
@@ -359,6 +359,7 @@ ON CONFLICT(child_thread_id) DO NOTHING
archived_only,
allowed_sources,
model_providers,
cwd_filters: None,
anchor: None,
sort_key: crate::SortKey::UpdatedAt,
sort_direction: SortDirection::Desc,
@@ -437,6 +438,7 @@ ON CONFLICT(child_thread_id) DO NOTHING
archived_only,
allowed_sources,
model_providers,
cwd_filters: None,
anchor,
sort_key,
sort_direction: SortDirection::Desc,
@@ -1002,6 +1004,7 @@ pub struct ThreadFilterOptions<'a> {
pub archived_only: bool,
pub allowed_sources: &'a [String],
pub model_providers: Option<&'a [String]>,
pub cwd_filters: Option<&'a [PathBuf]>,
pub anchor: Option<&'a crate::Anchor>,
pub sort_key: SortKey,
pub sort_direction: SortDirection,
@@ -1016,6 +1019,7 @@ pub(super) fn push_thread_filters<'a>(
archived_only,
allowed_sources,
model_providers,
cwd_filters,
anchor,
sort_key,
sort_direction,
@@ -1046,6 +1050,20 @@ pub(super) fn push_thread_filters<'a>(
}
separated.push_unseparated(")");
}
match cwd_filters {
Some([]) => {
builder.push(" AND 1 = 0");
}
Some(cwd_filters) => {
builder.push(" AND threads.cwd IN (");
let mut separated = builder.separated(", ");
for cwd in cwd_filters {
separated.push_bind(cwd.display().to_string());
}
separated.push_unseparated(")");
}
None => {}
}
if let Some(search_term) = search_term {
builder.push(" AND instr(threads.title, ");
builder.push_bind(search_term);
@@ -1188,6 +1206,7 @@ mod tests {
archived_only: false,
allowed_sources: &[],
model_providers: Some(&model_providers),
cwd_filters: None,
anchor: Some(&anchor),
sort_key: SortKey::UpdatedAt,
sort_direction: SortDirection::Asc,
@@ -1214,6 +1233,7 @@ mod tests {
archived_only: false,
allowed_sources: &[],
model_providers: Some(&model_providers),
cwd_filters: None,
anchor: page.next_anchor.as_ref(),
sort_key: SortKey::UpdatedAt,
sort_direction: SortDirection::Asc,
@@ -1228,6 +1248,77 @@ mod tests {
assert_eq!(page.next_anchor, None);
}
#[tokio::test]
async fn list_threads_filters_by_cwd() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("state db should initialize");
let first_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000101").expect("valid thread id");
let second_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000102").expect("valid thread id");
let other_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000103").expect("valid thread id");
let first_cwd = codex_home.join("first");
let second_cwd = codex_home.join("second");
let other_cwd = codex_home.join("other");
for (thread_id, cwd, updated_at) in [
(first_id, first_cwd.clone(), 1_700_000_100),
(second_id, second_cwd.clone(), 1_700_000_300),
(other_id, other_cwd, 1_700_000_500),
] {
let mut metadata = test_thread_metadata(&codex_home, thread_id, cwd);
metadata.updated_at =
DateTime::<Utc>::from_timestamp(updated_at, 0).expect("valid timestamp");
runtime
.upsert_thread(&metadata)
.await
.expect("thread insert should succeed");
}
let cwd_filters = vec![first_cwd, second_cwd];
let page = runtime
.list_threads(
/*page_size*/ 10,
ThreadFilterOptions {
archived_only: false,
allowed_sources: &[],
model_providers: None,
cwd_filters: Some(cwd_filters.as_slice()),
anchor: None,
sort_key: SortKey::UpdatedAt,
sort_direction: SortDirection::Desc,
search_term: None,
},
)
.await
.expect("list should succeed");
let ids = page.items.iter().map(|item| item.id).collect::<Vec<_>>();
assert_eq!(ids, vec![second_id, first_id]);
let page = runtime
.list_threads(
/*page_size*/ 10,
ThreadFilterOptions {
archived_only: false,
allowed_sources: &[],
model_providers: None,
cwd_filters: Some(&[]),
anchor: None,
sort_key: SortKey::UpdatedAt,
sort_direction: SortDirection::Desc,
search_term: None,
},
)
.await
.expect("list with empty cwd filters should succeed");
assert_eq!(page.items, Vec::new());
}
#[tokio::test]
async fn apply_rollout_items_restores_memory_mode_from_session_meta() {
let codex_home = unique_temp_dir();
@@ -103,8 +103,10 @@ mod tests {
sort_direction: crate::SortDirection::Desc,
allowed_sources: Vec::new(),
model_providers: None,
cwd_filters: None,
archived: true,
search_term: None,
use_state_db_only: false,
})
.await
.expect("archived listing");
@@ -68,7 +68,35 @@ async fn list_rollout_threads(
sort_key: codex_rollout::ThreadSortKey,
sort_direction: codex_rollout::SortDirection,
) -> ThreadStoreResult<codex_rollout::ThreadsPage> {
let page = if params.archived {
let page = if params.use_state_db_only && params.archived {
RolloutRecorder::list_archived_threads_from_state_db(
config,
params.page_size,
cursor,
sort_key,
sort_direction,
params.allowed_sources.as_slice(),
params.model_providers.as_deref(),
params.cwd_filters.as_deref(),
config.model_provider_id.as_str(),
params.search_term.as_deref(),
)
.await
} else if params.use_state_db_only {
RolloutRecorder::list_threads_from_state_db(
config,
params.page_size,
cursor,
sort_key,
sort_direction,
params.allowed_sources.as_slice(),
params.model_providers.as_deref(),
params.cwd_filters.as_deref(),
config.model_provider_id.as_str(),
params.search_term.as_deref(),
)
.await
} else if params.archived {
RolloutRecorder::list_archived_threads(
config,
params.page_size,
@@ -77,6 +105,7 @@ async fn list_rollout_threads(
sort_direction,
params.allowed_sources.as_slice(),
params.model_providers.as_deref(),
params.cwd_filters.as_deref(),
config.model_provider_id.as_str(),
params.search_term.as_deref(),
)
@@ -90,6 +119,7 @@ async fn list_rollout_threads(
sort_direction,
params.allowed_sources.as_slice(),
params.model_providers.as_deref(),
params.cwd_filters.as_deref(),
config.model_provider_id.as_str(),
params.search_term.as_deref(),
)
@@ -140,8 +170,10 @@ mod tests {
sort_direction: SortDirection::Desc,
allowed_sources: Vec::new(),
model_providers: None,
cwd_filters: None,
archived: false,
search_term: None,
use_state_db_only: false,
})
.await
.expect("thread listing");
@@ -196,8 +228,10 @@ mod tests {
sort_direction: SortDirection::Desc,
allowed_sources: Vec::new(),
model_providers: None,
cwd_filters: None,
archived: false,
search_term: Some("needle".to_string()),
use_state_db_only: true,
})
.await
.expect("thread listing");
@@ -233,8 +267,10 @@ mod tests {
sort_direction: SortDirection::Desc,
allowed_sources: Vec::new(),
model_providers: None,
cwd_filters: None,
archived: false,
search_term: None,
use_state_db_only: false,
})
.await
.expect("active listing");
@@ -246,8 +282,10 @@ mod tests {
sort_direction: SortDirection::Desc,
allowed_sources: Vec::new(),
model_providers: None,
cwd_filters: None,
archived: true,
search_term: None,
use_state_db_only: false,
})
.await
.expect("archived listing");
@@ -295,8 +333,10 @@ mod tests {
sort_direction: SortDirection::Desc,
allowed_sources: vec![SessionSource::Cli],
model_providers: Some(vec!["test-provider".to_string()]),
cwd_filters: None,
archived: false,
search_term: None,
use_state_db_only: false,
})
.await
.expect("thread listing");
@@ -329,8 +369,10 @@ mod tests {
sort_direction: SortDirection::Desc,
allowed_sources: Vec::new(),
model_providers: None,
cwd_filters: None,
archived: false,
search_term: None,
use_state_db_only: false,
})
.await
.expect_err("invalid cursor should fail");
@@ -30,8 +30,15 @@ pub(super) async fn list_threads(
model_provider_filter: params
.model_providers
.map(|values| proto::ModelProviderFilter { values }),
cwd_filter: params.cwd_filters.map(|values| proto::CwdFilter {
values: values
.into_iter()
.map(|cwd| cwd.display().to_string())
.collect(),
}),
archived: params.archived,
search_term: params.search_term,
use_state_db_only: params.use_state_db_only,
};
let response = store
@@ -91,12 +98,19 @@ mod tests {
);
assert_eq!(request.archived, true);
assert_eq!(request.search_term.as_deref(), Some("needle"));
assert!(request.use_state_db_only);
assert_eq!(
request.model_provider_filter,
Some(proto::ModelProviderFilter {
values: vec!["openai".to_string()],
})
);
assert_eq!(
request.cwd_filter,
Some(proto::CwdFilter {
values: vec!["/workspace".to_string()],
})
);
assert_eq!(request.allowed_sources.len(), 1);
assert_eq!(
proto::SessionSourceKind::try_from(request.allowed_sources[0].kind),
@@ -164,8 +178,10 @@ mod tests {
sort_direction: crate::SortDirection::Desc,
allowed_sources: vec![SessionSource::Cli],
model_providers: Some(vec!["openai".to_string()]),
cwd_filters: Some(vec![PathBuf::from("/workspace")]),
archived: true,
search_term: Some("needle".to_string()),
use_state_db_only: true,
})
.await
.expect("list threads");
@@ -14,12 +14,18 @@ message ListThreadsRequest {
optional ModelProviderFilter model_provider_filter = 5;
bool archived = 6;
optional string search_term = 7;
optional CwdFilter cwd_filter = 8;
bool use_state_db_only = 9;
}
message ModelProviderFilter {
repeated string values = 1;
}
message CwdFilter {
repeated string values = 1;
}
enum ThreadSortKey {
THREAD_SORT_KEY_CREATED_AT = 0;
THREAD_SORT_KEY_UPDATED_AT = 1;
@@ -17,12 +17,21 @@ pub struct ListThreadsRequest {
pub archived: bool,
#[prost(string, optional, tag = "7")]
pub search_term: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, optional, tag = "8")]
pub cwd_filter: ::core::option::Option<CwdFilter>,
#[prost(bool, tag = "9")]
pub use_state_db_only: bool,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct ModelProviderFilter {
#[prost(string, repeated, tag = "1")]
pub values: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct CwdFilter {
#[prost(string, repeated, tag = "1")]
pub values: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListThreadsResponse {
#[prost(message, repeated, tag = "1")]
+5
View File
@@ -128,10 +128,15 @@ pub struct ListThreadsParams {
/// Optional model provider filter. `None` means implementation default, while an empty vector
/// means all providers.
pub model_providers: Option<Vec<String>>,
/// Optional cwd filters. `None` means all working directories, while an empty vector matches no
/// threads.
pub cwd_filters: Option<Vec<PathBuf>>,
/// Whether archived threads should be listed instead of active threads.
pub archived: bool,
/// Optional substring/full-text search term for thread title/preview.
pub search_term: Option<String>,
/// Return directly from the state DB without scanning JSONL rollouts to repair metadata.
pub use_state_db_only: bool,
}
/// A page of stored thread records.
+12 -3
View File
@@ -27,6 +27,7 @@ use codex_app_server_protocol::Account as AppServerAccount;
use codex_app_server_protocol::AuthMode as AppServerAuthMode;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::Thread as AppServerThread;
use codex_app_server_protocol::ThreadListCwdFilter;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadSortKey as AppServerThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
@@ -523,6 +524,7 @@ async fn lookup_session_target_by_name_with_app_server(
source_kinds: Some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]),
archived: Some(false),
cwd: None,
use_state_db_only: false,
search_term: Some(name.to_string()),
})
.await?;
@@ -614,7 +616,8 @@ fn latest_session_lookup_params(
source_kinds: (!include_non_interactive)
.then_some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]),
archived: Some(false),
cwd: cwd_filter.map(|cwd| cwd.to_string_lossy().to_string()),
cwd: cwd_filter.map(|cwd| ThreadListCwdFilter::One(cwd.to_string_lossy().to_string())),
use_state_db_only: false,
search_term: None,
}
}
@@ -1883,7 +1886,10 @@ mod tests {
);
assert_eq!(params.model_providers, Some(vec![config.model_provider_id]));
assert_eq!(params.cwd, Some(cwd.to_string_lossy().to_string()));
assert_eq!(
params.cwd,
Some(ThreadListCwdFilter::One(cwd.to_string_lossy().to_string()))
);
Ok(())
}
@@ -1918,7 +1924,10 @@ mod tests {
);
assert_eq!(params.model_providers, None);
assert_eq!(params.cwd.as_deref(), Some("repo/on/server"));
assert_eq!(
params.cwd,
Some(ThreadListCwdFilter::One(String::from("repo/on/server")))
);
Ok(())
}
+7 -2
View File
@@ -14,6 +14,7 @@ use crate::tui::TuiEvent;
use chrono::DateTime;
use chrono::Utc;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadListCwdFilter;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
@@ -1002,7 +1003,8 @@ fn thread_list_params(
source_kinds: (!include_non_interactive)
.then_some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]),
archived: Some(false),
cwd: cwd_filter.map(|cwd| cwd.to_string_lossy().to_string()),
cwd: cwd_filter.map(|cwd| ThreadListCwdFilter::One(cwd.to_string_lossy().into_owned())),
use_state_db_only: false,
search_term: None,
}
}
@@ -1573,7 +1575,10 @@ mod tests {
params.source_kinds,
Some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode])
);
assert_eq!(params.cwd.as_deref(), Some("repo/on/server"));
assert_eq!(
params.cwd,
Some(ThreadListCwdFilter::One(String::from("repo/on/server")))
);
}
#[test]