mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
feat(app-server): list descendant threads by ancestor (#29591)
## Why `thread/list` can filter direct children with `parentThreadId`, but clients cannot request an entire spawned subtree. Discovering every descendant requires repeated client-side requests and gives up the database's existing filtering and pagination path. ## What changed Experimental clients can use `ancestorThreadId` to return strict descendants at any depth while `parentThreadId` retains its direct-child meaning. The filters are mutually exclusive, the ancestor is excluded, and every result preserves its immediate `parentThreadId` so callers can reconstruct the tree. ## How it works - **Explicit relationship:** Internal list parameters distinguish direct children from transitive descendants without changing the meaning of `parentThreadId`. - **Existing graph:** Persisted parent-child spawn edges remain the source of truth, so descendant lookup needs no schema migration or ancestry cache. - **Indexed traversal:** A recursive SQLite query starts from the parent-edge index, walks each generation, and applies thread filters, sorting, and cursor pagination in the same database request. - **Reconstructable results:** The response stays flat and normally ordered while carrying each descendant's immediate parent. ## Verification Ran 550 tests across the protocol, state, rollout, and thread-store crates, then reran the four focused state, store, and app-server descendant-listing tests after the final diff reduction. Scoped Clippy and formatting checks passed. Stable and experimental schema generation was checked; the stable fixtures remain unchanged while the experimental schema includes the new field.
This commit is contained in:
committed by
GitHub
Unverified
parent
3ccef20ef4
commit
8057603d0c
@@ -1088,10 +1088,15 @@ pub struct ThreadListParams {
|
||||
/// Optional substring filter for the extracted thread title.
|
||||
#[ts(optional = nullable)]
|
||||
pub search_term: Option<String>,
|
||||
/// Optional direct parent thread filter.
|
||||
/// Optional direct parent thread filter. Mutually exclusive with `ancestorThreadId`.
|
||||
#[experimental("thread/list.parentThreadId")]
|
||||
#[ts(optional = nullable)]
|
||||
pub parent_thread_id: Option<String>,
|
||||
/// Optional ancestor thread filter. Returns spawned descendants at any depth, excluding the
|
||||
/// ancestor itself. Mutually exclusive with `parentThreadId`.
|
||||
#[experimental("thread/list.ancestorThreadId")]
|
||||
#[ts(optional = nullable)]
|
||||
pub ancestor_thread_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
|
||||
@@ -1222,6 +1222,7 @@ async fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u3
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
cwd: None,
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
|
||||
@@ -141,7 +141,7 @@ Example with notification opt-out:
|
||||
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. Accepts the same permission override rules as `thread/start`. Multi-agent mode restores the last effective mode from rollout history when available; clients can select another mode on the first `turn/start`.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`.
|
||||
- `thread/start`, `thread/resume`, and `thread/fork` responses include the legacy `sandbox` compatibility projection. `instructionSources` lists loaded instruction files using each source environment's native absolute path syntax, including files loaded from remote environments. Experimental clients can read `runtimeWorkspaceRoots` for the thread-scoped runtime roots and `activePermissionProfile` for the named or implicit built-in profile identity/provenance when known. Their experimental `multiAgentMode` field, and the corresponding thread setting, report the thread's current mode. Turn construction separately determines whether that mode is applicable to the selected model and runtime configuration.
|
||||
- `thread/list` — page through stored threads; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Experimental clients can use `parentThreadId` to filter direct spawned children represented by persisted spawn-edge state. Review and Guardian threads are not included because they do not participate in that spawn-edge lifecycle. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. Subagent threads also include `parentThreadId` when the immediate parent is known.
|
||||
- `thread/list` — page through stored threads; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Experimental clients can use `parentThreadId` for direct spawned children or `ancestorThreadId` for spawned descendants at any depth; the two filters are mutually exclusive. Review and Guardian threads are not included because they do not participate in that spawn-edge lifecycle. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. Subagent threads also include `parentThreadId` when the immediate parent is known.
|
||||
- `thread/loaded/list` — list the thread ids currently loaded in memory.
|
||||
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
- `thread/turns/list` — experimental; page through a stored thread’s turn history without resuming it; supports cursor-based pagination with `sortDirection`, `itemsView`, `nextCursor`, and `backwardsCursor`.
|
||||
@@ -403,18 +403,19 @@ Example:
|
||||
|
||||
When `nextCursor` is `null`, you’ve reached the final page.
|
||||
|
||||
### Example: List direct child threads
|
||||
### Example: List descendant threads
|
||||
|
||||
Enable `capabilities.experimentalApi` during initialization, then use `thread/list` with `parentThreadId` to page through a thread's direct spawned children from persisted spawn-edge state. Results do not recursively include grandchildren. Review and Guardian threads are not included because they do not participate in the spawn-edge lifecycle. When `modelProviders` or `sourceKinds` is omitted, parent-filtered requests include every provider or source kind, respectively. Explicit filters retain the ordinary `thread/list` behavior, including the interactive-only default for an empty `sourceKinds` list.
|
||||
Enable `capabilities.experimentalApi` during initialization, then use `thread/list` with `ancestorThreadId` to page through every spawned descendant of a thread from persisted spawn-edge state. The ancestor itself is excluded, and each result's `parentThreadId` remains its immediate parent. Use `parentThreadId` instead when only direct children are wanted; sending both filters is invalid. Review and Guardian threads are not included because they do not participate in the spawn-edge lifecycle. When `modelProviders` or `sourceKinds` is omitted, relationship-filtered requests include every provider or source kind, respectively. Explicit filters retain the ordinary `thread/list` behavior, including the interactive-only default for an empty `sourceKinds` list.
|
||||
|
||||
```json
|
||||
{ "method": "thread/list", "id": 21, "params": {
|
||||
"parentThreadId": "00000000-0000-0000-0000-000000000100",
|
||||
"ancestorThreadId": "00000000-0000-0000-0000-000000000100",
|
||||
"limit": 25
|
||||
} }
|
||||
{ "id": 21, "result": {
|
||||
"data": [
|
||||
{ "id": "00000000-0000-0000-0000-000000000101", "parentThreadId": "00000000-0000-0000-0000-000000000100", "status": { "type": "notLoaded" } }
|
||||
{ "id": "00000000-0000-0000-0000-000000000101", "parentThreadId": "00000000-0000-0000-0000-000000000100", "status": { "type": "notLoaded" } },
|
||||
{ "id": "00000000-0000-0000-0000-000000000102", "parentThreadId": "00000000-0000-0000-0000-000000000101", "status": { "type": "notLoaded" } }
|
||||
],
|
||||
"nextCursor": null,
|
||||
"backwardsCursor": null
|
||||
|
||||
@@ -446,6 +446,7 @@ use codex_thread_store::SearchThreadsParams as StoreSearchThreadsParams;
|
||||
use codex_thread_store::SortDirection as StoreSortDirection;
|
||||
use codex_thread_store::StoredThread;
|
||||
use codex_thread_store::ThreadMetadataPatch as StoreThreadMetadataPatch;
|
||||
use codex_thread_store::ThreadRelationFilter as StoreThreadRelationFilter;
|
||||
use codex_thread_store::ThreadSortKey as StoreThreadSortKey;
|
||||
use codex_thread_store::ThreadStore;
|
||||
use codex_thread_store::ThreadStoreError;
|
||||
|
||||
@@ -16,7 +16,7 @@ struct ThreadListFilters {
|
||||
cwd_filters: Option<Vec<PathBuf>>,
|
||||
search_term: Option<String>,
|
||||
use_state_db_only: bool,
|
||||
parent_thread_id: Option<ThreadId>,
|
||||
relation_filter: Option<StoreThreadRelationFilter>,
|
||||
}
|
||||
|
||||
fn collect_resume_override_mismatches(
|
||||
@@ -1904,13 +1904,25 @@ impl ThreadRequestProcessor {
|
||||
use_state_db_only,
|
||||
search_term,
|
||||
parent_thread_id,
|
||||
ancestor_thread_id,
|
||||
} = params;
|
||||
let cwd_filters = normalize_thread_list_cwd_filters(cwd)?;
|
||||
let parent_thread_id = parent_thread_id
|
||||
.as_deref()
|
||||
.map(ThreadId::from_string)
|
||||
.transpose()
|
||||
.map_err(|err| invalid_request(format!("invalid parent thread id: {err}")))?;
|
||||
let relation_filter = match (parent_thread_id, ancestor_thread_id) {
|
||||
(Some(_), Some(_)) => {
|
||||
return Err(invalid_request(
|
||||
"parentThreadId and ancestorThreadId are mutually exclusive",
|
||||
));
|
||||
}
|
||||
(Some(parent_thread_id), None) => Some(StoreThreadRelationFilter::DirectChildrenOf(
|
||||
ThreadId::from_string(&parent_thread_id)
|
||||
.map_err(|err| invalid_request(format!("invalid parent thread id: {err}")))?,
|
||||
)),
|
||||
(None, Some(ancestor_thread_id)) => Some(StoreThreadRelationFilter::DescendantsOf(
|
||||
ThreadId::from_string(&ancestor_thread_id)
|
||||
.map_err(|err| invalid_request(format!("invalid ancestor thread id: {err}")))?,
|
||||
)),
|
||||
(None, None) => None,
|
||||
};
|
||||
|
||||
let requested_page_size = limit
|
||||
.map(|value| value as usize)
|
||||
@@ -1935,7 +1947,7 @@ impl ThreadRequestProcessor {
|
||||
cwd_filters,
|
||||
search_term,
|
||||
use_state_db_only,
|
||||
parent_thread_id,
|
||||
relation_filter,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
@@ -3665,7 +3677,7 @@ impl ThreadRequestProcessor {
|
||||
cwd_filters,
|
||||
search_term,
|
||||
use_state_db_only,
|
||||
parent_thread_id,
|
||||
relation_filter,
|
||||
} = filters;
|
||||
let mut cursor_obj = cursor;
|
||||
let mut last_cursor = cursor_obj.clone();
|
||||
@@ -3681,11 +3693,11 @@ impl ThreadRequestProcessor {
|
||||
Some(providers)
|
||||
}
|
||||
}
|
||||
None if parent_thread_id.is_some() => None,
|
||||
None if relation_filter.is_some() => None,
|
||||
None => Some(vec![self.config.model_provider_id.clone()]),
|
||||
};
|
||||
let (allowed_sources_vec, source_kind_filter) =
|
||||
if parent_thread_id.is_some() && source_kinds.is_none() {
|
||||
if relation_filter.is_some() && source_kinds.is_none() {
|
||||
(Vec::new(), None)
|
||||
} else {
|
||||
compute_source_filters(source_kinds)
|
||||
@@ -3711,7 +3723,7 @@ impl ThreadRequestProcessor {
|
||||
archived,
|
||||
search_term: search_term.clone(),
|
||||
use_state_db_only,
|
||||
parent_thread_id,
|
||||
relation_filter,
|
||||
})
|
||||
.await
|
||||
.map_err(thread_store_list_error)?;
|
||||
|
||||
@@ -714,6 +714,7 @@ async fn external_agent_config_import_creates_session_rollouts() -> Result<()> {
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
@@ -892,6 +893,7 @@ required = true
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
@@ -983,6 +985,7 @@ async fn external_agent_config_import_accepts_detected_session_payload_after_res
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
@@ -1075,6 +1078,7 @@ async fn external_agent_config_import_skips_already_imported_session_versions()
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
@@ -1214,6 +1218,7 @@ async fn external_agent_config_import_returns_before_background_session_import_f
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
@@ -1343,6 +1348,7 @@ async fn external_agent_config_import_compacts_huge_session_before_first_follow_
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
|
||||
@@ -133,6 +133,7 @@ async fn thread_delete_with_non_local_thread_store_does_not_create_local_persist
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
},
|
||||
})
|
||||
.await?
|
||||
|
||||
@@ -72,6 +72,7 @@ async fn list_threads(mcp: &mut TestAppServer) -> Result<ThreadListResponse> {
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let list_resp: JSONRPCResponse = timeout(
|
||||
|
||||
@@ -97,6 +97,7 @@ async fn list_threads_with_sort(
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
@@ -107,14 +108,23 @@ async fn list_threads_with_sort(
|
||||
to_response::<ThreadListResponse>(resp)
|
||||
}
|
||||
|
||||
async fn list_threads_for_parent(
|
||||
enum ThreadListRelation {
|
||||
DirectChildrenOf(ThreadId),
|
||||
DescendantsOf(ThreadId),
|
||||
}
|
||||
|
||||
async fn list_threads_for_relation(
|
||||
mcp: &mut TestAppServer,
|
||||
parent_thread_id: ThreadId,
|
||||
relation: ThreadListRelation,
|
||||
cursor: Option<String>,
|
||||
limit: u32,
|
||||
model_providers: Option<Vec<String>>,
|
||||
source_kinds: Option<Vec<ThreadSourceKind>>,
|
||||
) -> Result<ThreadListResponse> {
|
||||
let (parent_thread_id, ancestor_thread_id) = match relation {
|
||||
ThreadListRelation::DirectChildrenOf(thread_id) => (Some(thread_id.to_string()), None),
|
||||
ThreadListRelation::DescendantsOf(thread_id) => (None, Some(thread_id.to_string())),
|
||||
};
|
||||
let request_id = mcp
|
||||
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
|
||||
cursor,
|
||||
@@ -127,7 +137,8 @@ async fn list_threads_for_parent(
|
||||
cwd: None,
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: Some(parent_thread_id.to_string()),
|
||||
parent_thread_id,
|
||||
ancestor_thread_id,
|
||||
})
|
||||
.await?;
|
||||
let response = timeout(
|
||||
@@ -571,6 +582,7 @@ async fn thread_list_respects_cwd_filters() -> Result<()> {
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
@@ -681,6 +693,7 @@ sqlite = true
|
||||
use_state_db_only: false,
|
||||
search_term: Some("needle".to_string()),
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
@@ -898,6 +911,7 @@ sqlite = true
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
@@ -937,6 +951,7 @@ sqlite = true
|
||||
use_state_db_only: true,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
@@ -967,6 +982,7 @@ sqlite = true
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
@@ -981,9 +997,10 @@ sqlite = true
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_list_parent_filter_reads_direct_children_from_state_db() -> Result<()> {
|
||||
async fn thread_list_relation_filters_read_spawn_graph_from_state_db() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
create_minimal_config(codex_home.path())?;
|
||||
let mut mcp = init_mcp(codex_home.path()).await?;
|
||||
let parent_id = ThreadId::new();
|
||||
let older_child_id = ThreadId::new();
|
||||
let newer_child_id = ThreadId::new();
|
||||
@@ -1044,16 +1061,19 @@ async fn thread_list_parent_filter_reads_direct_children_from_state_db() -> Resu
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
let mut mcp = init_mcp(codex_home.path()).await?;
|
||||
|
||||
let first_page = list_threads_for_parent(
|
||||
&mut mcp, parent_id, /*cursor*/ None, /*limit*/ 1, /*model_providers*/ None,
|
||||
let first_page = list_threads_for_relation(
|
||||
&mut mcp,
|
||||
ThreadListRelation::DirectChildrenOf(parent_id),
|
||||
/*cursor*/ None,
|
||||
/*limit*/ 1,
|
||||
/*model_providers*/ None,
|
||||
/*source_kinds*/ None,
|
||||
)
|
||||
.await?;
|
||||
let second_page = list_threads_for_parent(
|
||||
let second_page = list_threads_for_relation(
|
||||
&mut mcp,
|
||||
parent_id,
|
||||
ThreadListRelation::DirectChildrenOf(parent_id),
|
||||
first_page.next_cursor.clone(),
|
||||
/*limit*/ 1,
|
||||
/*model_providers*/ None,
|
||||
@@ -1086,9 +1106,9 @@ async fn thread_list_parent_filter_reads_direct_children_from_state_db() -> Resu
|
||||
.chain(&second_page.data)
|
||||
.all(|thread| thread.parent_thread_id.as_deref() == Some(expected_parent_id.as_str()))
|
||||
);
|
||||
let interactive_only = list_threads_for_parent(
|
||||
let interactive_only = list_threads_for_relation(
|
||||
&mut mcp,
|
||||
parent_id,
|
||||
ThreadListRelation::DirectChildrenOf(parent_id),
|
||||
/*cursor*/ None,
|
||||
/*limit*/ 10,
|
||||
/*model_providers*/ None,
|
||||
@@ -1103,11 +1123,34 @@ async fn thread_list_parent_filter_reads_direct_children_from_state_db() -> Resu
|
||||
.collect::<Vec<_>>(),
|
||||
vec![newer_child_id.to_string()]
|
||||
);
|
||||
|
||||
let descendants = list_threads_for_relation(
|
||||
&mut mcp,
|
||||
ThreadListRelation::DescendantsOf(parent_id),
|
||||
/*cursor*/ None,
|
||||
/*limit*/ 10,
|
||||
/*model_providers*/ None,
|
||||
/*source_kinds*/ None,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(
|
||||
descendants
|
||||
.data
|
||||
.iter()
|
||||
.map(|thread| (thread.id.clone(), thread.parent_thread_id.clone()))
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
(grandchild_id.to_string(), Some(newer_child_id.to_string())),
|
||||
(newer_child_id.to_string(), Some(parent_id.to_string())),
|
||||
(older_child_id.to_string(), Some(parent_id.to_string())),
|
||||
]
|
||||
);
|
||||
assert_eq!(descendants.next_cursor, None);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_list_parent_filter_rejects_malformed_thread_id() -> Result<()> {
|
||||
async fn thread_list_relation_filters_reject_invalid_requests() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
create_minimal_config(codex_home.path())?;
|
||||
let mut mcp = init_mcp(codex_home.path()).await?;
|
||||
@@ -1124,6 +1167,7 @@ async fn thread_list_parent_filter_rejects_malformed_thread_id() -> Result<()> {
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: Some("not-a-thread-id".to_string()),
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let error = timeout(
|
||||
@@ -1133,6 +1177,34 @@ async fn thread_list_parent_filter_rejects_malformed_thread_id() -> Result<()> {
|
||||
.await??;
|
||||
assert_eq!(error.error.code, -32600);
|
||||
|
||||
let thread_id = ThreadId::new().to_string();
|
||||
let request_id = mcp
|
||||
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
|
||||
cursor: None,
|
||||
limit: Some(10),
|
||||
sort_key: None,
|
||||
sort_direction: None,
|
||||
model_providers: None,
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
cwd: None,
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: Some(thread_id.clone()),
|
||||
ancestor_thread_id: Some(thread_id),
|
||||
})
|
||||
.await?;
|
||||
let error = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
assert_eq!(error.error.code, -32600);
|
||||
assert_eq!(
|
||||
error.error.message,
|
||||
"parentThreadId and ancestorThreadId are mutually exclusive"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1908,6 +1980,7 @@ async fn thread_list_backwards_cursor_can_seed_forward_delta_sync() -> Result<()
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
@@ -1951,6 +2024,7 @@ async fn thread_list_backwards_cursor_can_seed_forward_delta_sync() -> Result<()
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
@@ -2190,6 +2264,7 @@ async fn thread_list_invalid_cursor_returns_error() -> Result<()> {
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let error: JSONRPCError = timeout(
|
||||
|
||||
@@ -567,6 +567,7 @@ async fn thread_list_includes_store_thread_without_rollout_path() -> Result<()>
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
},
|
||||
})
|
||||
.await?
|
||||
@@ -962,6 +963,7 @@ async fn thread_name_set_is_reflected_in_read_list_and_resume() -> Result<()> {
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
})
|
||||
.await?;
|
||||
let list_resp: JSONRPCResponse = timeout(
|
||||
|
||||
@@ -88,7 +88,7 @@ async fn has_threads(store: &LocalThreadStore, archived: bool) -> io::Result<boo
|
||||
allowed_sources: Vec::new(),
|
||||
model_providers: None,
|
||||
cwd_filters: None,
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
archived,
|
||||
search_term: None,
|
||||
use_state_db_only: false,
|
||||
|
||||
@@ -137,7 +137,7 @@ async fn load_recent_threads(sess: &Session) -> Vec<StoredThread> {
|
||||
allowed_sources: Vec::new(),
|
||||
model_providers: None,
|
||||
cwd_filters: None,
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
archived: false,
|
||||
search_term: None,
|
||||
use_state_db_only: false,
|
||||
|
||||
@@ -1471,6 +1471,7 @@ async fn resolve_resume_thread_id(
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
cwd: None,
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
@@ -1537,6 +1538,7 @@ async fn resolve_resume_thread_id(
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
cwd: None,
|
||||
use_state_db_only: false,
|
||||
search_term: Some(session_id.to_string()),
|
||||
|
||||
@@ -403,7 +403,7 @@ impl RolloutRecorder {
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
cwd_filters,
|
||||
/*parent_thread_id*/ None,
|
||||
/*relation_filter*/ None,
|
||||
archived,
|
||||
search_term,
|
||||
)
|
||||
@@ -512,7 +512,7 @@ impl RolloutRecorder {
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
cwd_filters,
|
||||
/*parent_thread_id*/ None,
|
||||
/*relation_filter*/ None,
|
||||
archived,
|
||||
search_term,
|
||||
)
|
||||
@@ -541,7 +541,7 @@ impl RolloutRecorder {
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
cwd_filters,
|
||||
/*parent_thread_id*/ None,
|
||||
/*relation_filter*/ None,
|
||||
archived,
|
||||
search_term,
|
||||
)
|
||||
@@ -581,7 +581,7 @@ impl RolloutRecorder {
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
cwd_filters,
|
||||
/*parent_thread_id*/ None,
|
||||
/*relation_filter*/ None,
|
||||
archived,
|
||||
search_term,
|
||||
)
|
||||
@@ -659,7 +659,7 @@ impl RolloutRecorder {
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
cwd_filter.as_ref().map(std::slice::from_ref),
|
||||
/*parent_thread_id*/ None,
|
||||
/*relation_filter*/ None,
|
||||
/*archived*/ false,
|
||||
/*search_term*/ None,
|
||||
)
|
||||
@@ -1083,7 +1083,10 @@ async fn fill_missing_thread_item_metadata_from_state_db(
|
||||
continue;
|
||||
}
|
||||
};
|
||||
fill_missing_thread_item_metadata(item, thread_item_from_state_metadata(metadata));
|
||||
fill_missing_thread_item_metadata(
|
||||
item,
|
||||
thread_item_from_state_metadata(metadata, /*parent_thread_id*/ None),
|
||||
);
|
||||
}
|
||||
|
||||
page
|
||||
@@ -1775,21 +1778,32 @@ impl JsonlWriter {
|
||||
|
||||
impl From<codex_state::ThreadsPage> for ThreadsPage {
|
||||
fn from(db_page: codex_state::ThreadsPage) -> Self {
|
||||
let items = db_page
|
||||
.items
|
||||
let codex_state::ThreadsPage {
|
||||
items,
|
||||
parent_thread_ids,
|
||||
next_anchor,
|
||||
num_scanned_rows,
|
||||
} = db_page;
|
||||
let items = items
|
||||
.into_iter()
|
||||
.map(thread_item_from_state_metadata)
|
||||
.map(|item| {
|
||||
let parent_thread_id = parent_thread_ids.get(&item.id).copied();
|
||||
thread_item_from_state_metadata(item, parent_thread_id)
|
||||
})
|
||||
.collect();
|
||||
Self {
|
||||
items,
|
||||
next_cursor: db_page.next_anchor.map(Into::into),
|
||||
num_scanned_files: db_page.num_scanned_rows,
|
||||
next_cursor: next_anchor.map(Into::into),
|
||||
num_scanned_files: num_scanned_rows,
|
||||
reached_scan_cap: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_item_from_state_metadata(item: codex_state::ThreadMetadata) -> ThreadItem {
|
||||
fn thread_item_from_state_metadata(
|
||||
item: codex_state::ThreadMetadata,
|
||||
parent_thread_id: Option<ThreadId>,
|
||||
) -> ThreadItem {
|
||||
ThreadItem {
|
||||
path: item.rollout_path,
|
||||
thread_id: Some(item.id),
|
||||
@@ -1804,7 +1818,7 @@ fn thread_item_from_state_metadata(item: codex_state::ThreadMetadata) -> ThreadI
|
||||
.or_else(|_| serde_json::from_value(Value::String(item.source)))
|
||||
.unwrap_or(SessionSource::Unknown),
|
||||
),
|
||||
parent_thread_id: None,
|
||||
parent_thread_id,
|
||||
agent_nickname: item.agent_nickname,
|
||||
agent_role: item.agent_role,
|
||||
model_provider: Some(item.model_provider),
|
||||
|
||||
@@ -367,7 +367,7 @@ pub async fn list_threads_db(
|
||||
allowed_sources: &[SessionSource],
|
||||
model_providers: Option<&[String]>,
|
||||
cwd_filters: Option<&[PathBuf]>,
|
||||
parent_thread_id: Option<ThreadId>,
|
||||
relation_filter: Option<codex_state::ThreadRelationFilter>,
|
||||
archived: bool,
|
||||
search_term: Option<&str>,
|
||||
) -> Option<codex_state::ThreadsPage> {
|
||||
@@ -413,17 +413,17 @@ pub async fn list_threads_db(
|
||||
},
|
||||
search_term,
|
||||
};
|
||||
let page = match parent_thread_id {
|
||||
Some(parent_thread_id) => {
|
||||
ctx.list_threads_by_parent(page_size, parent_thread_id, filters)
|
||||
let page = match relation_filter {
|
||||
Some(relation_filter) => {
|
||||
ctx.list_threads_by_relation(page_size, relation_filter, filters)
|
||||
.await
|
||||
}
|
||||
None => ctx.list_threads(page_size, filters).await,
|
||||
};
|
||||
match page {
|
||||
Ok(mut page) => {
|
||||
// Parent-filtered listings intentionally treat persisted state as authoritative.
|
||||
if parent_thread_id.is_some() {
|
||||
// Relationship-filtered listings intentionally treat persisted state as authoritative.
|
||||
if relation_filter.is_some() {
|
||||
return Some(page);
|
||||
}
|
||||
let mut valid_items = Vec::with_capacity(page.items.len());
|
||||
|
||||
@@ -55,6 +55,7 @@ pub use model::ThreadGoal;
|
||||
pub use model::ThreadGoalStatus;
|
||||
pub use model::ThreadMetadata;
|
||||
pub use model::ThreadMetadataBuilder;
|
||||
pub use model::ThreadRelationFilter;
|
||||
pub use model::ThreadsPage;
|
||||
pub use runtime::ExternalAgentConfigImportDetailsRecord;
|
||||
pub use runtime::ExternalAgentConfigImportFailureRecord;
|
||||
|
||||
@@ -33,6 +33,7 @@ pub use thread_metadata::SortDirection;
|
||||
pub use thread_metadata::SortKey;
|
||||
pub use thread_metadata::ThreadMetadata;
|
||||
pub use thread_metadata::ThreadMetadataBuilder;
|
||||
pub use thread_metadata::ThreadRelationFilter;
|
||||
pub use thread_metadata::ThreadsPage;
|
||||
|
||||
pub(crate) use agent_job::AgentJobItemRow;
|
||||
|
||||
@@ -9,6 +9,7 @@ use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::ThreadSource;
|
||||
use sqlx::Row;
|
||||
use sqlx::sqlite::SqliteRow;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// The sort key to use when listing threads.
|
||||
@@ -29,6 +30,15 @@ pub enum SortDirection {
|
||||
Desc,
|
||||
}
|
||||
|
||||
/// Spawn-graph relationship used to filter thread listings.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ThreadRelationFilter {
|
||||
/// Return only threads whose immediate parent is the given thread.
|
||||
DirectChildrenOf(ThreadId),
|
||||
/// Return every thread transitively descended from the given thread.
|
||||
DescendantsOf(ThreadId),
|
||||
}
|
||||
|
||||
/// A pagination anchor used for keyset pagination.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Anchor {
|
||||
@@ -43,6 +53,8 @@ pub struct Anchor {
|
||||
pub struct ThreadsPage {
|
||||
/// The thread metadata items in this page.
|
||||
pub items: Vec<ThreadMetadata>,
|
||||
/// Immediate parents for page items found through the persisted spawn graph.
|
||||
pub parent_thread_ids: HashMap<ThreadId, ThreadId>,
|
||||
/// The next anchor to use for pagination, if any.
|
||||
pub next_anchor: Option<Anchor>,
|
||||
/// The number of rows scanned to produce this page.
|
||||
@@ -475,7 +487,11 @@ impl TryFrom<ThreadRow> for ThreadMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn anchor_from_item(item: &ThreadMetadata, sort_key: SortKey) -> Option<Anchor> {
|
||||
pub(crate) fn anchor_from_item(
|
||||
item: &ThreadMetadata,
|
||||
sort_key: SortKey,
|
||||
include_thread_id_tiebreaker: bool,
|
||||
) -> Option<Anchor> {
|
||||
let ts = match sort_key {
|
||||
SortKey::CreatedAt => item.created_at,
|
||||
SortKey::UpdatedAt => item.updated_at,
|
||||
@@ -483,7 +499,7 @@ pub(crate) fn anchor_from_item(item: &ThreadMetadata, sort_key: SortKey) -> Opti
|
||||
};
|
||||
Some(Anchor {
|
||||
ts,
|
||||
id: (sort_key == SortKey::RecencyAt).then_some(item.id),
|
||||
id: (include_thread_id_tiebreaker || sort_key == SortKey::RecencyAt).then_some(item.id),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -211,6 +211,7 @@ FROM threads
|
||||
sort_direction: SortDirection::Desc,
|
||||
search_term: None,
|
||||
},
|
||||
/*include_thread_id_tiebreaker*/ false,
|
||||
);
|
||||
builder.push(" AND threads.memory_mode = 'enabled'");
|
||||
builder
|
||||
|
||||
@@ -375,6 +375,7 @@ ON CONFLICT(child_thread_id) DO NOTHING
|
||||
sort_direction: SortDirection::Desc,
|
||||
search_term: None,
|
||||
},
|
||||
/*include_thread_id_tiebreaker*/ false,
|
||||
);
|
||||
builder.push(" AND threads.title = ");
|
||||
builder.push_bind(title);
|
||||
@@ -387,6 +388,7 @@ ON CONFLICT(child_thread_id) DO NOTHING
|
||||
crate::SortKey::UpdatedAt,
|
||||
SortDirection::Desc,
|
||||
OrderByIndex::Enabled,
|
||||
/*include_thread_id_tiebreaker*/ false,
|
||||
/*limit*/ 1,
|
||||
);
|
||||
|
||||
@@ -401,7 +403,7 @@ ON CONFLICT(child_thread_id) DO NOTHING
|
||||
page_size: usize,
|
||||
filters: ThreadFilterOptions<'_>,
|
||||
) -> anyhow::Result<crate::ThreadsPage> {
|
||||
self.list_threads_matching(page_size, filters, /*parent_thread_id*/ None)
|
||||
self.list_threads_matching(page_size, filters, /*relation_filter*/ None)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -412,7 +414,22 @@ ON CONFLICT(child_thread_id) DO NOTHING
|
||||
parent_thread_id: ThreadId,
|
||||
filters: ThreadFilterOptions<'_>,
|
||||
) -> anyhow::Result<crate::ThreadsPage> {
|
||||
self.list_threads_matching(page_size, filters, Some(parent_thread_id))
|
||||
self.list_threads_by_relation(
|
||||
page_size,
|
||||
crate::ThreadRelationFilter::DirectChildrenOf(parent_thread_id),
|
||||
filters,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// List threads matching a persisted spawn-graph relationship.
|
||||
pub async fn list_threads_by_relation(
|
||||
&self,
|
||||
page_size: usize,
|
||||
relation_filter: crate::ThreadRelationFilter,
|
||||
filters: ThreadFilterOptions<'_>,
|
||||
) -> anyhow::Result<crate::ThreadsPage> {
|
||||
self.list_threads_matching(page_size, filters, Some(relation_filter))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -420,29 +437,40 @@ ON CONFLICT(child_thread_id) DO NOTHING
|
||||
&self,
|
||||
page_size: usize,
|
||||
filters: ThreadFilterOptions<'_>,
|
||||
parent_thread_id: Option<ThreadId>,
|
||||
relation_filter: Option<crate::ThreadRelationFilter>,
|
||||
) -> anyhow::Result<crate::ThreadsPage> {
|
||||
let limit = page_size.saturating_add(1);
|
||||
|
||||
let mut builder = QueryBuilder::<Sqlite>::new("");
|
||||
push_list_threads_query(&mut builder, filters, parent_thread_id, limit);
|
||||
push_list_threads_query(&mut builder, filters, relation_filter, limit);
|
||||
|
||||
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
|
||||
let mut items = rows
|
||||
.into_iter()
|
||||
.map(|row| ThreadRow::try_from_row(&row).and_then(ThreadMetadata::try_from))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let mut items = Vec::with_capacity(rows.len());
|
||||
let mut parent_thread_ids = std::collections::HashMap::new();
|
||||
for row in rows {
|
||||
let item = ThreadRow::try_from_row(&row).and_then(ThreadMetadata::try_from)?;
|
||||
if relation_filter.is_some()
|
||||
&& let Some(parent_thread_id) =
|
||||
row.try_get::<Option<String>, _>("parent_thread_id")?
|
||||
{
|
||||
parent_thread_ids.insert(item.id, ThreadId::try_from(parent_thread_id)?);
|
||||
}
|
||||
items.push(item);
|
||||
}
|
||||
let num_scanned_rows = items.len();
|
||||
let next_anchor = if items.len() > page_size {
|
||||
items.pop();
|
||||
items
|
||||
.last()
|
||||
.and_then(|item| anchor_from_item(item, filters.sort_key))
|
||||
if let Some(overflow_item) = items.pop() {
|
||||
parent_thread_ids.remove(&overflow_item.id);
|
||||
}
|
||||
items.last().and_then(|item| {
|
||||
anchor_from_item(item, filters.sort_key, relation_filter.is_some())
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(ThreadsPage {
|
||||
items,
|
||||
parent_thread_ids,
|
||||
next_anchor,
|
||||
num_scanned_rows,
|
||||
})
|
||||
@@ -471,12 +499,14 @@ ON CONFLICT(child_thread_id) DO NOTHING
|
||||
sort_direction: SortDirection::Desc,
|
||||
search_term: None,
|
||||
},
|
||||
sort_key == crate::SortKey::RecencyAt,
|
||||
);
|
||||
push_thread_order_and_limit(
|
||||
&mut builder,
|
||||
sort_key,
|
||||
SortDirection::Desc,
|
||||
OrderByIndex::Enabled,
|
||||
sort_key == crate::SortKey::RecencyAt,
|
||||
limit,
|
||||
);
|
||||
|
||||
@@ -1097,30 +1127,71 @@ fn one_thread_id_from_rows(
|
||||
fn push_list_threads_query(
|
||||
builder: &mut QueryBuilder<Sqlite>,
|
||||
filters: ThreadFilterOptions<'_>,
|
||||
parent_thread_id: Option<ThreadId>,
|
||||
relation_filter: Option<crate::ThreadRelationFilter>,
|
||||
limit: usize,
|
||||
) {
|
||||
push_thread_select_columns(builder);
|
||||
builder.push(" FROM threads");
|
||||
push_thread_filters(builder, filters);
|
||||
if let Some(parent_thread_id) = parent_thread_id {
|
||||
if let Some(crate::ThreadRelationFilter::DescendantsOf(ancestor_thread_id)) = relation_filter {
|
||||
builder.push(
|
||||
" AND threads.id IN (SELECT child_thread_id FROM thread_spawn_edges WHERE parent_thread_id = ",
|
||||
r#"
|
||||
WITH RECURSIVE subtree(child_thread_id, parent_thread_id) AS (
|
||||
SELECT child_thread_id, parent_thread_id
|
||||
FROM thread_spawn_edges
|
||||
WHERE parent_thread_id =
|
||||
"#,
|
||||
);
|
||||
builder.push_bind(ancestor_thread_id.to_string());
|
||||
builder.push(
|
||||
r#"
|
||||
UNION
|
||||
SELECT edge.child_thread_id, edge.parent_thread_id
|
||||
FROM thread_spawn_edges AS edge
|
||||
JOIN subtree ON edge.parent_thread_id = subtree.child_thread_id
|
||||
)
|
||||
"#,
|
||||
);
|
||||
builder.push_bind(parent_thread_id.to_string());
|
||||
builder.push(")");
|
||||
}
|
||||
let order_by_index = match filters.cwd_filters {
|
||||
push_thread_select_columns(builder);
|
||||
// SQLite may otherwise reorder these joins and scan the global timestamp index before
|
||||
// checking the relationship. CROSS JOIN keeps the selective edge/subtree traversal first.
|
||||
match relation_filter {
|
||||
Some(crate::ThreadRelationFilter::DirectChildrenOf(_)) => builder.push(
|
||||
", listed_edge.parent_thread_id AS parent_thread_id\nFROM thread_spawn_edges AS listed_edge\nCROSS JOIN threads ON threads.id = listed_edge.child_thread_id",
|
||||
),
|
||||
Some(crate::ThreadRelationFilter::DescendantsOf(_)) => builder.push(
|
||||
", subtree.parent_thread_id AS parent_thread_id\nFROM subtree\nCROSS JOIN threads ON threads.id = subtree.child_thread_id",
|
||||
),
|
||||
None => builder.push(" FROM threads"),
|
||||
};
|
||||
let include_thread_id_tiebreaker =
|
||||
relation_filter.is_some() || filters.sort_key == SortKey::RecencyAt;
|
||||
push_thread_filters(builder, filters, include_thread_id_tiebreaker);
|
||||
match relation_filter {
|
||||
Some(crate::ThreadRelationFilter::DirectChildrenOf(parent_thread_id)) => {
|
||||
builder.push(" AND listed_edge.parent_thread_id = ");
|
||||
builder.push_bind(parent_thread_id.to_string());
|
||||
}
|
||||
Some(crate::ThreadRelationFilter::DescendantsOf(ancestor_thread_id)) => {
|
||||
builder.push(" AND subtree.child_thread_id != ");
|
||||
builder.push_bind(ancestor_thread_id.to_string());
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
let order_by_index = match (relation_filter, filters.cwd_filters) {
|
||||
// Relationship listings are expected to be much smaller than the global thread table.
|
||||
// Prefer the spawn-edge index and sort the matching subtree instead of scanning the
|
||||
// timestamp index until enough related threads happen to be found.
|
||||
(Some(_), _) => OrderByIndex::Disabled,
|
||||
// Multi-cwd listing is supported but at the time of writing has no current use in production.
|
||||
// Preserve its query plan so the global timestamp index does not regress cwd filtering into a scan.
|
||||
Some(cwd_filters) if cwd_filters.len() > 1 => OrderByIndex::Disabled,
|
||||
Some(_) | None => OrderByIndex::Enabled,
|
||||
(None, Some(cwd_filters)) if cwd_filters.len() > 1 => OrderByIndex::Disabled,
|
||||
(None, Some(_) | None) => OrderByIndex::Enabled,
|
||||
};
|
||||
push_thread_order_and_limit(
|
||||
builder,
|
||||
filters.sort_key,
|
||||
filters.sort_direction,
|
||||
order_by_index,
|
||||
include_thread_id_tiebreaker,
|
||||
limit,
|
||||
);
|
||||
}
|
||||
@@ -1191,6 +1262,7 @@ pub struct ThreadFilterOptions<'a> {
|
||||
pub(super) fn push_thread_filters<'a>(
|
||||
builder: &mut QueryBuilder<Sqlite>,
|
||||
options: ThreadFilterOptions<'a>,
|
||||
include_thread_id_tiebreaker: bool,
|
||||
) {
|
||||
let ThreadFilterOptions {
|
||||
archived_only,
|
||||
@@ -1265,9 +1337,7 @@ pub(super) fn push_thread_filters<'a>(
|
||||
builder.push(operator);
|
||||
builder.push(" ");
|
||||
builder.push_bind(anchor_ts);
|
||||
if sort_key == SortKey::RecencyAt
|
||||
&& let Some(anchor_id) = anchor.id
|
||||
{
|
||||
if include_thread_id_tiebreaker && let Some(anchor_id) = anchor.id {
|
||||
builder.push(" OR (");
|
||||
builder.push(column);
|
||||
builder.push(" = ");
|
||||
@@ -1297,6 +1367,7 @@ pub(super) fn push_thread_order_and_limit(
|
||||
sort_key: SortKey,
|
||||
sort_direction: SortDirection,
|
||||
order_by_index: OrderByIndex,
|
||||
include_thread_id_tiebreaker: bool,
|
||||
limit: usize,
|
||||
) {
|
||||
let order_column = match sort_key {
|
||||
@@ -1318,7 +1389,7 @@ pub(super) fn push_thread_order_and_limit(
|
||||
builder.push(order_column);
|
||||
builder.push(" ");
|
||||
builder.push(order_direction);
|
||||
if sort_key == SortKey::RecencyAt {
|
||||
if include_thread_id_tiebreaker {
|
||||
builder.push(", threads.id ");
|
||||
builder.push(order_direction);
|
||||
}
|
||||
@@ -1814,7 +1885,7 @@ mod tests {
|
||||
sort_direction: SortDirection::Desc,
|
||||
search_term: None,
|
||||
},
|
||||
/*parent_thread_id*/ None,
|
||||
/*relation_filter*/ None,
|
||||
/*limit*/ 201,
|
||||
);
|
||||
let plan_details = builder
|
||||
@@ -1844,7 +1915,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_threads_by_parent_filters_direct_children_with_keyset_pagination() {
|
||||
async fn list_threads_by_relation_filters_spawn_graph_with_keyset_pagination() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
|
||||
.await
|
||||
@@ -1857,7 +1928,8 @@ mod tests {
|
||||
let grandchild_id = ThreadId::new();
|
||||
|
||||
for (thread_id, created_at) in [
|
||||
(first_child_id, 1_700_000_100),
|
||||
(parent_id, 1_700_000_000),
|
||||
(first_child_id, 1_700_000_200),
|
||||
(second_child_id, 1_700_000_200),
|
||||
(grandchild_id, 1_700_000_300),
|
||||
] {
|
||||
@@ -1893,6 +1965,37 @@ mod tests {
|
||||
.expect("spawn edge insert should succeed");
|
||||
}
|
||||
|
||||
let mut builder = QueryBuilder::<Sqlite>::new("EXPLAIN QUERY PLAN ");
|
||||
push_list_threads_query(
|
||||
&mut builder,
|
||||
ThreadFilterOptions {
|
||||
archived_only: false,
|
||||
allowed_sources: &[],
|
||||
model_providers: None,
|
||||
cwd_filters: None,
|
||||
anchor: None,
|
||||
sort_key: SortKey::CreatedAt,
|
||||
sort_direction: SortDirection::Desc,
|
||||
search_term: None,
|
||||
},
|
||||
Some(crate::ThreadRelationFilter::DescendantsOf(parent_id)),
|
||||
/*limit*/ 10,
|
||||
);
|
||||
let plan_details = builder
|
||||
.build()
|
||||
.fetch_all(runtime.pool.as_ref())
|
||||
.await
|
||||
.expect("relationship query plan should load")
|
||||
.into_iter()
|
||||
.map(|row| row.get::<String, _>("detail"))
|
||||
.collect::<Vec<_>>();
|
||||
assert!(
|
||||
plan_details
|
||||
.iter()
|
||||
.any(|detail| detail.contains("idx_thread_spawn_edges_parent_status")),
|
||||
"spawn relationship query did not use the parent index: {plan_details:?}"
|
||||
);
|
||||
|
||||
let filters = |anchor| ThreadFilterOptions {
|
||||
archived_only: false,
|
||||
allowed_sources: &[],
|
||||
@@ -1933,6 +2036,76 @@ mod tests {
|
||||
vec![first_child_id]
|
||||
);
|
||||
assert_eq!(second_page.next_anchor, None);
|
||||
|
||||
let first_descendant_page = runtime
|
||||
.list_threads_by_relation(
|
||||
/*page_size*/ 2,
|
||||
crate::ThreadRelationFilter::DescendantsOf(parent_id),
|
||||
filters(None),
|
||||
)
|
||||
.await
|
||||
.expect("first descendant page should succeed");
|
||||
let second_descendant_page = runtime
|
||||
.list_threads_by_relation(
|
||||
/*page_size*/ 2,
|
||||
crate::ThreadRelationFilter::DescendantsOf(parent_id),
|
||||
filters(first_descendant_page.next_anchor.as_ref()),
|
||||
)
|
||||
.await
|
||||
.expect("second descendant page should succeed");
|
||||
assert_eq!(
|
||||
(
|
||||
first_descendant_page
|
||||
.items
|
||||
.iter()
|
||||
.map(|item| item.id)
|
||||
.collect::<Vec<_>>(),
|
||||
second_descendant_page
|
||||
.items
|
||||
.iter()
|
||||
.map(|item| item.id)
|
||||
.collect::<Vec<_>>(),
|
||||
first_descendant_page.parent_thread_ids,
|
||||
second_descendant_page.parent_thread_ids,
|
||||
second_descendant_page.next_anchor,
|
||||
),
|
||||
(
|
||||
vec![grandchild_id, second_child_id],
|
||||
vec![first_child_id],
|
||||
[
|
||||
(grandchild_id, first_child_id),
|
||||
(second_child_id, parent_id)
|
||||
]
|
||||
.into(),
|
||||
[(first_child_id, parent_id)].into(),
|
||||
None,
|
||||
)
|
||||
);
|
||||
|
||||
runtime
|
||||
.upsert_thread_spawn_edge(
|
||||
grandchild_id,
|
||||
parent_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await
|
||||
.expect("cycle-closing spawn edge insert should succeed");
|
||||
let cyclic_descendants = runtime
|
||||
.list_threads_by_relation(
|
||||
/*page_size*/ 10,
|
||||
crate::ThreadRelationFilter::DescendantsOf(parent_id),
|
||||
filters(None),
|
||||
)
|
||||
.await
|
||||
.expect("cyclic descendant graph should terminate");
|
||||
assert_eq!(
|
||||
cyclic_descendants
|
||||
.items
|
||||
.iter()
|
||||
.map(|item| item.id)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![grandchild_id, second_child_id, first_child_id]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
@@ -29,6 +30,7 @@ use crate::StoredThread;
|
||||
use crate::StoredThreadHistory;
|
||||
use crate::ThreadMetadataPatch;
|
||||
use crate::ThreadPage;
|
||||
use crate::ThreadRelationFilter;
|
||||
use crate::ThreadStore;
|
||||
use crate::ThreadStoreError;
|
||||
use crate::ThreadStoreFuture;
|
||||
@@ -97,17 +99,20 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_threads_filters_by_parent_thread_id() {
|
||||
async fn list_threads_filters_by_spawn_relationship() {
|
||||
let store = InMemoryThreadStore::default();
|
||||
let parent_thread_id = ThreadId::default();
|
||||
let child_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000001").expect("valid thread id");
|
||||
let unrelated_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000002").expect("valid thread id");
|
||||
let grandchild_thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000003").expect("valid thread id");
|
||||
|
||||
for (thread_id, parent_thread_id) in [
|
||||
(child_thread_id, Some(parent_thread_id)),
|
||||
(unrelated_thread_id, None),
|
||||
(grandchild_thread_id, Some(child_thread_id)),
|
||||
] {
|
||||
store
|
||||
.create_thread(CreateThreadParams {
|
||||
@@ -145,7 +150,7 @@ mod tests {
|
||||
cwd_filters: None,
|
||||
archived: false,
|
||||
search_term: None,
|
||||
parent_thread_id: Some(parent_thread_id),
|
||||
relation_filter: Some(ThreadRelationFilter::DirectChildrenOf(parent_thread_id)),
|
||||
use_state_db_only: false,
|
||||
},
|
||||
)
|
||||
@@ -159,6 +164,33 @@ mod tests {
|
||||
.collect::<Vec<_>>(),
|
||||
vec![child_thread_id]
|
||||
);
|
||||
|
||||
let page = ThreadStore::list_threads(
|
||||
&store,
|
||||
ListThreadsParams {
|
||||
page_size: 10,
|
||||
cursor: None,
|
||||
sort_key: ThreadSortKey::CreatedAt,
|
||||
sort_direction: SortDirection::Desc,
|
||||
allowed_sources: Vec::new(),
|
||||
model_providers: None,
|
||||
cwd_filters: None,
|
||||
archived: false,
|
||||
search_term: None,
|
||||
relation_filter: Some(ThreadRelationFilter::DescendantsOf(parent_thread_id)),
|
||||
use_state_db_only: false,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("list descendant threads");
|
||||
|
||||
assert_eq!(
|
||||
page.items
|
||||
.into_iter()
|
||||
.map(|item| item.thread_id)
|
||||
.collect::<HashSet<_>>(),
|
||||
HashSet::from([child_thread_id, grandchild_thread_id])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -463,9 +495,33 @@ impl ThreadStore for InMemoryThreadStore {
|
||||
fn list_threads(&self, params: ListThreadsParams) -> ThreadStoreFuture<'_, ThreadPage> {
|
||||
Box::pin(async move {
|
||||
let mut page = InMemoryThreadStore::list_threads(self).await?;
|
||||
if let Some(parent_thread_id) = params.parent_thread_id {
|
||||
page.items
|
||||
.retain(|thread| thread.parent_thread_id == Some(parent_thread_id));
|
||||
match params.relation_filter {
|
||||
Some(ThreadRelationFilter::DirectChildrenOf(parent_thread_id)) => {
|
||||
page.items
|
||||
.retain(|thread| thread.parent_thread_id == Some(parent_thread_id));
|
||||
}
|
||||
Some(ThreadRelationFilter::DescendantsOf(ancestor_thread_id)) => {
|
||||
let mut subtree = HashSet::from([ancestor_thread_id]);
|
||||
loop {
|
||||
let mut discovered = false;
|
||||
for thread in &page.items {
|
||||
if thread
|
||||
.parent_thread_id
|
||||
.is_some_and(|parent_thread_id| subtree.contains(&parent_thread_id))
|
||||
{
|
||||
discovered |= subtree.insert(thread.thread_id);
|
||||
}
|
||||
}
|
||||
if !discovered {
|
||||
break;
|
||||
}
|
||||
}
|
||||
page.items.retain(|thread| {
|
||||
thread.thread_id != ancestor_thread_id
|
||||
&& subtree.contains(&thread.thread_id)
|
||||
});
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
Ok(page)
|
||||
})
|
||||
|
||||
@@ -50,6 +50,7 @@ pub use types::StoredTurnStatus;
|
||||
pub use types::ThreadMetadataPatch;
|
||||
pub use types::ThreadPage;
|
||||
pub use types::ThreadPersistenceMetadata;
|
||||
pub use types::ThreadRelationFilter;
|
||||
pub use types::ThreadSearchPage;
|
||||
pub use types::ThreadSortKey;
|
||||
pub use types::TurnPage;
|
||||
|
||||
@@ -110,7 +110,7 @@ mod tests {
|
||||
cwd_filters: None,
|
||||
archived: true,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
use_state_db_only: false,
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -14,6 +14,7 @@ use super::helpers::stored_thread_from_rollout_item;
|
||||
use crate::ListThreadsParams;
|
||||
use crate::SortDirection;
|
||||
use crate::ThreadPage;
|
||||
use crate::ThreadRelationFilter;
|
||||
use crate::ThreadSortKey;
|
||||
use crate::ThreadStoreError;
|
||||
use crate::ThreadStoreResult;
|
||||
@@ -117,7 +118,15 @@ pub(super) async fn list_rollout_threads(
|
||||
sort_key: codex_rollout::ThreadSortKey,
|
||||
sort_direction: codex_rollout::SortDirection,
|
||||
) -> ThreadStoreResult<codex_rollout::ThreadsPage> {
|
||||
if let Some(parent_thread_id) = params.parent_thread_id {
|
||||
if let Some(relation_filter) = params.relation_filter {
|
||||
let relation_filter = match relation_filter {
|
||||
ThreadRelationFilter::DirectChildrenOf(parent_thread_id) => {
|
||||
codex_state::ThreadRelationFilter::DirectChildrenOf(parent_thread_id)
|
||||
}
|
||||
ThreadRelationFilter::DescendantsOf(ancestor_thread_id) => {
|
||||
codex_state::ThreadRelationFilter::DescendantsOf(ancestor_thread_id)
|
||||
}
|
||||
};
|
||||
let page = codex_rollout::state_db::list_threads_db(
|
||||
state_db.as_deref(),
|
||||
config.codex_home.as_path(),
|
||||
@@ -128,19 +137,15 @@ pub(super) async fn list_rollout_threads(
|
||||
params.allowed_sources.as_slice(),
|
||||
params.model_providers.as_deref(),
|
||||
params.cwd_filters.as_deref(),
|
||||
Some(parent_thread_id),
|
||||
Some(relation_filter),
|
||||
params.archived,
|
||||
params.search_term.as_deref(),
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| ThreadStoreError::Internal {
|
||||
message: "state DB unavailable for parent-filtered thread listing".to_string(),
|
||||
message: "state DB unavailable for relationship-filtered thread listing".to_string(),
|
||||
})?;
|
||||
let mut page: codex_rollout::ThreadsPage = page.into();
|
||||
for item in &mut page.items {
|
||||
item.parent_thread_id = Some(parent_thread_id);
|
||||
}
|
||||
return Ok(page);
|
||||
return Ok(page.into());
|
||||
}
|
||||
|
||||
let page = if params.use_state_db_only && params.archived {
|
||||
@@ -252,7 +257,7 @@ mod tests {
|
||||
cwd_filters: None,
|
||||
archived: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
use_state_db_only: false,
|
||||
})
|
||||
.await
|
||||
@@ -312,7 +317,7 @@ mod tests {
|
||||
cwd_filters: None,
|
||||
archived: false,
|
||||
search_term: Some("needle".to_string()),
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
use_state_db_only: true,
|
||||
})
|
||||
.await
|
||||
@@ -352,7 +357,7 @@ mod tests {
|
||||
cwd_filters: None,
|
||||
archived: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
use_state_db_only: false,
|
||||
})
|
||||
.await
|
||||
@@ -368,7 +373,7 @@ mod tests {
|
||||
cwd_filters: None,
|
||||
archived: true,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
use_state_db_only: false,
|
||||
})
|
||||
.await
|
||||
@@ -420,7 +425,7 @@ mod tests {
|
||||
cwd_filters: None,
|
||||
archived: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
use_state_db_only: false,
|
||||
})
|
||||
.await
|
||||
@@ -457,7 +462,7 @@ mod tests {
|
||||
cwd_filters: None,
|
||||
archived: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
use_state_db_only: false,
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -98,7 +98,7 @@ pub(super) async fn search_threads(
|
||||
cwd_filters: None,
|
||||
archived: params.archived,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
use_state_db_only: state_db.is_some(),
|
||||
};
|
||||
let mut remaining_rollouts = matching_rollouts;
|
||||
|
||||
@@ -1495,7 +1495,7 @@ mod tests {
|
||||
cwd_filters: Some(vec![workspace]),
|
||||
archived: false,
|
||||
search_term: None,
|
||||
parent_thread_id: None,
|
||||
relation_filter: None,
|
||||
use_state_db_only: true,
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -182,6 +182,15 @@ pub enum SortDirection {
|
||||
Desc,
|
||||
}
|
||||
|
||||
/// Spawn-graph relationship used to filter thread listings.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum ThreadRelationFilter {
|
||||
/// Return only threads whose immediate parent is the given thread.
|
||||
DirectChildrenOf(ThreadId),
|
||||
/// Return every thread transitively descended from the given thread.
|
||||
DescendantsOf(ThreadId),
|
||||
}
|
||||
|
||||
/// Parameters for listing threads.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ListThreadsParams {
|
||||
@@ -205,8 +214,8 @@ pub struct ListThreadsParams {
|
||||
pub archived: bool,
|
||||
/// Optional substring/full-text search term for thread title/preview.
|
||||
pub search_term: Option<String>,
|
||||
/// Optional direct parent thread filter.
|
||||
pub parent_thread_id: Option<ThreadId>,
|
||||
/// Optional spawn-graph relationship filter.
|
||||
pub relation_filter: Option<ThreadRelationFilter>,
|
||||
/// Return directly from the state DB without scanning JSONL rollouts to repair metadata.
|
||||
pub use_state_db_only: bool,
|
||||
}
|
||||
|
||||
@@ -627,6 +627,7 @@ async fn lookup_session_target_by_name_with_app_server(
|
||||
source_kinds: Some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]),
|
||||
archived: Some(false),
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
cwd: None,
|
||||
use_state_db_only: false,
|
||||
search_term: Some(name.to_string()),
|
||||
@@ -740,6 +741,7 @@ fn latest_session_lookup_params(
|
||||
source_kinds: Some(resume_source_kinds(include_non_interactive)),
|
||||
archived: Some(false),
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
cwd: cwd_filter.map(|cwd| ThreadListCwdFilter::One(cwd.to_string_lossy().to_string())),
|
||||
use_state_db_only: match lookup_mode {
|
||||
LatestSessionLookupMode::StateDbOnly => true,
|
||||
|
||||
@@ -1822,6 +1822,7 @@ fn thread_list_params(
|
||||
source_kinds: Some(crate::resume_source_kinds(include_non_interactive)),
|
||||
archived: Some(false),
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
cwd: cwd_filter.map(|cwd| ThreadListCwdFilter::One(cwd.to_string_lossy().into_owned())),
|
||||
use_state_db_only: false,
|
||||
search_term: None,
|
||||
|
||||
@@ -180,6 +180,7 @@ async fn lookup_session_by_exact_name(
|
||||
)),
|
||||
archived: Some(archived),
|
||||
parent_thread_id: None,
|
||||
ancestor_thread_id: None,
|
||||
cwd: None,
|
||||
use_state_db_only: false,
|
||||
search_term: search_term.map(str::to_string),
|
||||
|
||||
Reference in New Issue
Block a user