diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadListParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadListParams.ts index ce5b6a79b..ce2539af0 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadListParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadListParams.ts @@ -5,50 +5,40 @@ import type { SortDirection } from "./SortDirection"; import type { ThreadSortKey } from "./ThreadSortKey"; import type { ThreadSourceKind } from "./ThreadSourceKind"; -export type ThreadListParams = { -/** +export type ThreadListParams = {/** * Opaque pagination cursor returned by a previous call. */ -cursor?: string | null, -/** +cursor?: string | null, /** * Optional page size; defaults to a reasonable server-side value. */ -limit?: number | null, -/** +limit?: number | null, /** * Optional sort key; defaults to created_at. */ -sortKey?: ThreadSortKey | null, -/** +sortKey?: ThreadSortKey | null, /** * Optional sort direction; defaults to descending (newest first). */ -sortDirection?: SortDirection | null, -/** +sortDirection?: SortDirection | null, /** * Optional provider filter; when set, only sessions recorded under these * providers are returned. When present but empty, includes all providers. */ -modelProviders?: Array | null, -/** +modelProviders?: Array | null, /** * Optional source filter; when set, only sessions from these source kinds * are returned. When omitted or empty, defaults to interactive sources. */ -sourceKinds?: Array | null, -/** +sourceKinds?: Array | null, /** * Optional archived filter; when set to true, only archived threads are returned. * If false or null, only non-archived threads are returned. */ -archived?: boolean | null, -/** +archived?: boolean | null, /** * Optional cwd filter or filters; when set, only threads whose session cwd * exactly matches one of these paths are returned. */ -cwd?: string | Array | null, -/** +cwd?: string | Array | null, /** * If true, return from the state DB without scanning JSONL rollouts to * repair thread metadata. Omitted or false preserves scan-and-repair * behavior. */ -useStateDbOnly?: boolean, -/** +useStateDbOnly?: boolean, /** * Optional substring filter for the extracted thread title. */ -searchTerm?: string | null, }; +searchTerm?: string | null}; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 7b3c42759..50ed39364 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -614,6 +614,7 @@ client_request_definitions! { }, ThreadList => "thread/list" { params: v2::ThreadListParams, + inspect_params: true, serialization: None, response: v2::ThreadListResponse, }, diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index 5b412c2ed..1343a2f06 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -1022,7 +1022,7 @@ pub struct ThreadRollbackResponse { pub thread: Thread, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, ExperimentalApi)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] pub struct ThreadListParams { @@ -1062,6 +1062,10 @@ pub struct ThreadListParams { /// Optional substring filter for the extracted thread title. #[ts(optional = nullable)] pub search_term: Option, + /// Optional direct parent thread filter. + #[experimental("thread/list.parentThreadId")] + #[ts(optional = nullable)] + pub parent_thread_id: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server-test-client/src/lib.rs b/codex-rs/app-server-test-client/src/lib.rs index e89a8dbe9..ddb8f994b 100644 --- a/codex-rs/app-server-test-client/src/lib.rs +++ b/codex-rs/app-server-test-client/src/lib.rs @@ -1129,6 +1129,7 @@ async fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u3 model_providers: None, source_kinds: None, archived: None, + parent_thread_id: None, cwd: None, use_state_db_only: false, search_term: None, diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 5840c52d0..0e8b5ba20 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -134,7 +134,7 @@ Example with notification opt-out: - `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. Accepts the same permission override rules as `thread/start`. - `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`. - `thread/start`, `thread/resume`, and `thread/fork` responses include the legacy `sandbox` compatibility projection. Experimental clients can read `runtimeWorkspaceRoots` for the thread-scoped runtime roots and `activePermissionProfile` for the named or implicit built-in profile identity/provenance when known. -- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. Subagent threads also include `parentThreadId` when the immediate control/spawn parent is known. +- `thread/list` — page through stored threads; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Experimental clients can use `parentThreadId` to filter direct spawned children represented by persisted spawn-edge state. Review and Guardian threads are not included because they do not participate in that spawn-edge lifecycle. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. Subagent threads also include `parentThreadId` when the immediate parent is known. - `thread/loaded/list` — list the thread ids currently loaded in memory. - `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. - `thread/turns/list` — experimental; page through a stored thread’s turn history without resuming it; supports cursor-based pagination with `sortDirection`, `itemsView`, `nextCursor`, and `backwardsCursor`. @@ -387,6 +387,24 @@ Example: When `nextCursor` is `null`, you’ve reached the final page. +### Example: List direct child threads + +Enable `capabilities.experimentalApi` during initialization, then use `thread/list` with `parentThreadId` to page through a thread's direct spawned children from persisted spawn-edge state. Results do not recursively include grandchildren. Review and Guardian threads are not included because they do not participate in the spawn-edge lifecycle. When `modelProviders` or `sourceKinds` is omitted, parent-filtered requests include every provider or source kind, respectively. Explicit filters retain the ordinary `thread/list` behavior, including the interactive-only default for an empty `sourceKinds` list. + +```json +{ "method": "thread/list", "id": 21, "params": { + "parentThreadId": "00000000-0000-0000-0000-000000000100", + "limit": 25 +} } +{ "id": 21, "result": { + "data": [ + { "id": "00000000-0000-0000-0000-000000000101", "parentThreadId": "00000000-0000-0000-0000-000000000100", "status": { "type": "notLoaded" } } + ], + "nextCursor": null, + "backwardsCursor": null +} } +``` + ### Example: List loaded threads `thread/loaded/list` returns thread ids currently loaded in memory. This is useful when you want to check which sessions are active without scanning rollouts on disk. diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index bd354753e..1c733ed1e 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -16,6 +16,7 @@ struct ThreadListFilters { cwd_filters: Option>, search_term: Option, use_state_db_only: bool, + parent_thread_id: Option, } fn collect_resume_override_mismatches( @@ -1875,8 +1876,14 @@ impl ThreadRequestProcessor { cwd, use_state_db_only, search_term, + parent_thread_id, } = params; let cwd_filters = normalize_thread_list_cwd_filters(cwd)?; + let parent_thread_id = parent_thread_id + .as_deref() + .map(ThreadId::from_string) + .transpose() + .map_err(|err| invalid_request(format!("invalid parent thread id: {err}")))?; let requested_page_size = limit .map(|value| value as usize) @@ -1900,6 +1907,7 @@ impl ThreadRequestProcessor { cwd_filters, search_term, use_state_db_only, + parent_thread_id, }, ) .await?; @@ -3562,6 +3570,7 @@ impl ThreadRequestProcessor { cwd_filters, search_term, use_state_db_only, + parent_thread_id, } = filters; let mut cursor_obj = cursor; let mut last_cursor = cursor_obj.clone(); @@ -3577,9 +3586,15 @@ impl ThreadRequestProcessor { Some(providers) } } + None if parent_thread_id.is_some() => None, None => Some(vec![self.config.model_provider_id.clone()]), }; - let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds); + let (allowed_sources_vec, source_kind_filter) = + if parent_thread_id.is_some() && source_kinds.is_none() { + (Vec::new(), None) + } else { + compute_source_filters(source_kinds) + }; let allowed_sources = allowed_sources_vec.as_slice(); let store_sort_direction = match sort_direction { SortDirection::Asc => StoreSortDirection::Asc, @@ -3601,6 +3616,7 @@ impl ThreadRequestProcessor { archived, search_term: search_term.clone(), use_state_db_only, + parent_thread_id, }) .await .map_err(thread_store_list_error)?; diff --git a/codex-rs/app-server/tests/suite/v2/external_agent_config.rs b/codex-rs/app-server/tests/suite/v2/external_agent_config.rs index 991e12dc7..b75d703c4 100644 --- a/codex-rs/app-server/tests/suite/v2/external_agent_config.rs +++ b/codex-rs/app-server/tests/suite/v2/external_agent_config.rs @@ -338,6 +338,7 @@ async fn external_agent_config_import_creates_session_rollouts() -> Result<()> { cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let response: JSONRPCResponse = timeout( @@ -514,6 +515,7 @@ required = true cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let response: JSONRPCResponse = timeout( @@ -601,6 +603,7 @@ async fn external_agent_config_import_accepts_detected_session_payload_after_res cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let response: JSONRPCResponse = timeout( @@ -688,6 +691,7 @@ async fn external_agent_config_import_skips_already_imported_session_versions() cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let response: JSONRPCResponse = timeout( @@ -827,6 +831,7 @@ async fn external_agent_config_import_returns_before_background_session_import_f cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let response: JSONRPCResponse = timeout( @@ -912,6 +917,7 @@ async fn external_agent_config_import_rejects_undetected_session_paths() -> Resu cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let response: JSONRPCResponse = timeout( @@ -1036,6 +1042,7 @@ async fn external_agent_config_import_compacts_huge_session_before_first_follow_ cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let response: JSONRPCResponse = timeout( diff --git a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs index 18d1dcd43..baf11aff5 100644 --- a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs +++ b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs @@ -132,6 +132,7 @@ async fn thread_delete_with_non_local_thread_store_does_not_create_local_persist cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }, }) .await? diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index a335b441f..4a650bcd4 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -71,6 +71,7 @@ async fn list_threads(mcp: &mut TestAppServer) -> Result { cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let list_resp: JSONRPCResponse = timeout( diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 0b4964712..3f6adc167 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -35,6 +35,7 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::SessionSource as CoreSessionSource; use codex_protocol::protocol::SubAgentSource; +use codex_state::DirectionalThreadSpawnEdgeStatus; use core_test_support::responses; use pretty_assertions::assert_eq; use std::cmp::Reverse; @@ -95,6 +96,7 @@ async fn list_threads_with_sort( cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let resp: JSONRPCResponse = timeout( @@ -105,6 +107,37 @@ async fn list_threads_with_sort( to_response::(resp) } +async fn list_threads_for_parent( + mcp: &mut TestAppServer, + parent_thread_id: ThreadId, + cursor: Option, + limit: u32, + model_providers: Option>, + source_kinds: Option>, +) -> Result { + let request_id = mcp + .send_thread_list_request(codex_app_server_protocol::ThreadListParams { + cursor, + limit: Some(limit), + sort_key: None, + sort_direction: None, + model_providers, + source_kinds, + archived: None, + cwd: None, + use_state_db_only: false, + search_term: None, + parent_thread_id: Some(parent_thread_id.to_string()), + }) + .await?; + let response = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + to_response::(response) +} + fn create_fake_rollouts( codex_home: &Path, count: usize, @@ -537,6 +570,7 @@ async fn thread_list_respects_cwd_filters() -> Result<()> { ])), use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let resp: JSONRPCResponse = timeout( @@ -646,6 +680,7 @@ sqlite = true cwd: None, use_state_db_only: false, search_term: Some("needle".to_string()), + parent_thread_id: None, }) .await?; let resp: JSONRPCResponse = timeout( @@ -862,6 +897,7 @@ sqlite = true cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let resp: JSONRPCResponse = timeout( @@ -900,6 +936,7 @@ sqlite = true )), use_state_db_only: true, search_term: None, + parent_thread_id: None, }) .await?; let resp: JSONRPCResponse = timeout( @@ -929,6 +966,7 @@ sqlite = true )), use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let resp: JSONRPCResponse = timeout( @@ -942,6 +980,162 @@ sqlite = true Ok(()) } +#[tokio::test] +async fn thread_list_parent_filter_reads_direct_children_from_state_db() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + let parent_id = ThreadId::new(); + let older_child_id = ThreadId::new(); + let newer_child_id = ThreadId::new(); + let grandchild_id = ThreadId::new(); + let state_db = codex_state::StateRuntime::init( + codex_home.path().to_path_buf(), + "mock_provider".to_string(), + ) + .await?; + for (thread_id, created_at, source, model_provider) in [ + ( + older_child_id, + "2025-02-01T10:00:00Z", + CoreSessionSource::SubAgent(SubAgentSource::Other("agent_job:job-1".to_string())), + "other_provider", + ), + ( + newer_child_id, + "2025-02-01T11:00:00Z", + CoreSessionSource::Cli, + "mock_provider", + ), + ( + grandchild_id, + "2025-02-01T12:00:00Z", + CoreSessionSource::SubAgent(SubAgentSource::Other("agent_job:job-2".to_string())), + "mock_provider", + ), + ] { + let created_at = DateTime::parse_from_rfc3339(created_at)?.with_timezone(&Utc); + let mut builder = codex_state::ThreadMetadataBuilder::new( + thread_id, + codex_home.path().join(format!("{thread_id}.jsonl")), + created_at, + source, + ); + builder.model_provider = Some(model_provider.to_string()); + builder.cwd = codex_home.path().to_path_buf(); + builder.cli_version = Some("0.0.0".to_string()); + let mut metadata = builder.build(model_provider); + metadata.preview = Some("child thread".to_string()); + metadata.first_user_message = metadata.preview.clone(); + state_db.upsert_thread(&metadata).await?; + } + for (parent_thread_id, child_thread_id) in [ + (parent_id, older_child_id), + (parent_id, newer_child_id), + (newer_child_id, grandchild_id), + ] { + state_db + .upsert_thread_spawn_edge( + parent_thread_id, + child_thread_id, + DirectionalThreadSpawnEdgeStatus::Open, + ) + .await?; + } + state_db + .mark_backfill_complete(/*last_watermark*/ None) + .await?; + let mut mcp = init_mcp(codex_home.path()).await?; + + let first_page = list_threads_for_parent( + &mut mcp, parent_id, /*cursor*/ None, /*limit*/ 1, /*model_providers*/ None, + /*source_kinds*/ None, + ) + .await?; + let second_page = list_threads_for_parent( + &mut mcp, + parent_id, + first_page.next_cursor.clone(), + /*limit*/ 1, + /*model_providers*/ None, + /*source_kinds*/ None, + ) + .await?; + + assert_eq!( + first_page + .data + .iter() + .map(|thread| thread.id.clone()) + .collect::>(), + vec![newer_child_id.to_string()] + ); + assert_eq!( + second_page + .data + .iter() + .map(|thread| thread.id.clone()) + .collect::>(), + vec![older_child_id.to_string()] + ); + assert_eq!(second_page.next_cursor, None); + let expected_parent_id = parent_id.to_string(); + assert!( + first_page + .data + .iter() + .chain(&second_page.data) + .all(|thread| thread.parent_thread_id.as_deref() == Some(expected_parent_id.as_str())) + ); + let interactive_only = list_threads_for_parent( + &mut mcp, + parent_id, + /*cursor*/ None, + /*limit*/ 10, + /*model_providers*/ None, + /*source_kinds*/ Some(Vec::new()), + ) + .await?; + assert_eq!( + interactive_only + .data + .iter() + .map(|thread| thread.id.clone()) + .collect::>(), + vec![newer_child_id.to_string()] + ); + Ok(()) +} + +#[tokio::test] +async fn thread_list_parent_filter_rejects_malformed_thread_id() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + let mut mcp = init_mcp(codex_home.path()).await?; + let request_id = mcp + .send_thread_list_request(codex_app_server_protocol::ThreadListParams { + cursor: None, + limit: Some(10), + sort_key: None, + sort_direction: None, + model_providers: None, + source_kinds: None, + archived: None, + cwd: None, + use_state_db_only: false, + search_term: None, + parent_thread_id: Some("not-a-thread-id".to_string()), + }) + .await?; + let error = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(request_id)), + ) + .await??; + assert_eq!(error.error.code, -32600); + + Ok(()) +} + #[tokio::test] async fn thread_list_empty_source_kinds_defaults_to_interactive_only() -> Result<()> { let codex_home = TempDir::new()?; @@ -1629,6 +1823,7 @@ async fn thread_list_backwards_cursor_can_seed_forward_delta_sync() -> Result<() cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let resp: JSONRPCResponse = timeout( @@ -1671,6 +1866,7 @@ async fn thread_list_backwards_cursor_can_seed_forward_delta_sync() -> Result<() cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let resp: JSONRPCResponse = timeout( @@ -1909,6 +2105,7 @@ async fn thread_list_invalid_cursor_returns_error() -> Result<()> { cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let error: JSONRPCError = timeout( diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 272ecac79..a6ed5e4a6 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -566,6 +566,7 @@ async fn thread_list_includes_store_thread_without_rollout_path() -> Result<()> cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }, }) .await? @@ -960,6 +961,7 @@ async fn thread_name_set_is_reflected_in_read_list_and_resume() -> Result<()> { cwd: None, use_state_db_only: false, search_term: None, + parent_thread_id: None, }) .await?; let list_resp: JSONRPCResponse = timeout( diff --git a/codex-rs/core/src/personality_migration.rs b/codex-rs/core/src/personality_migration.rs index 7109b58a4..2b49e0680 100644 --- a/codex-rs/core/src/personality_migration.rs +++ b/codex-rs/core/src/personality_migration.rs @@ -88,6 +88,7 @@ async fn has_threads(store: &LocalThreadStore, archived: bool) -> io::Result Vec { allowed_sources: Vec::new(), model_providers: None, cwd_filters: None, + parent_thread_id: None, archived: false, search_term: None, use_state_db_only: false, diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 6578523e9..f151d38ec 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -1449,6 +1449,7 @@ async fn resolve_resume_thread_id( model_providers: model_providers.clone(), source_kinds: Some(all_thread_source_kinds()), archived: Some(false), + parent_thread_id: None, cwd: None, use_state_db_only: false, search_term: None, @@ -1514,6 +1515,7 @@ async fn resolve_resume_thread_id( model_providers: model_providers.clone(), source_kinds: Some(all_thread_source_kinds()), archived: Some(false), + parent_thread_id: None, cwd: None, use_state_db_only: false, search_term: Some(session_id.to_string()), diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 9ade47489..5dc207049 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -376,6 +376,7 @@ impl RolloutRecorder { allowed_sources, model_providers, cwd_filters, + /*parent_thread_id*/ None, archived, search_term, ) @@ -484,6 +485,7 @@ impl RolloutRecorder { allowed_sources, model_providers, cwd_filters, + /*parent_thread_id*/ None, archived, search_term, ) @@ -512,6 +514,7 @@ impl RolloutRecorder { allowed_sources, model_providers, cwd_filters, + /*parent_thread_id*/ None, archived, search_term, ) @@ -608,6 +611,7 @@ impl RolloutRecorder { allowed_sources, model_providers, cwd_filter.as_ref().map(std::slice::from_ref), + /*parent_thread_id*/ None, /*archived*/ false, /*search_term*/ None, ) diff --git a/codex-rs/rollout/src/state_db.rs b/codex-rs/rollout/src/state_db.rs index c21d7002b..ab93305fa 100644 --- a/codex-rs/rollout/src/state_db.rs +++ b/codex-rs/rollout/src/state_db.rs @@ -363,6 +363,7 @@ pub async fn list_threads_db( allowed_sources: &[SessionSource], model_providers: Option<&[String]>, cwd_filters: Option<&[PathBuf]>, + parent_thread_id: Option, archived: bool, search_term: Option<&str>, ) -> Option { @@ -391,29 +392,35 @@ pub async fn list_threads_db( .map(|cwd| normalize_cwd_for_state_db(cwd)) .collect::>() }); - match ctx - .list_threads( - page_size, - codex_state::ThreadFilterOptions { - archived_only: archived, - allowed_sources: allowed_sources.as_slice(), - model_providers: model_providers.as_deref(), - cwd_filters: normalized_cwd_filters.as_deref(), - anchor: anchor.as_ref(), - sort_key: match sort_key { - ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt, - ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt, - }, - sort_direction: match sort_direction { - SortDirection::Asc => codex_state::SortDirection::Asc, - SortDirection::Desc => codex_state::SortDirection::Desc, - }, - search_term, - }, - ) - .await - { + let filters = codex_state::ThreadFilterOptions { + archived_only: archived, + allowed_sources: allowed_sources.as_slice(), + model_providers: model_providers.as_deref(), + cwd_filters: normalized_cwd_filters.as_deref(), + anchor: anchor.as_ref(), + sort_key: match sort_key { + ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt, + ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt, + }, + sort_direction: match sort_direction { + SortDirection::Asc => codex_state::SortDirection::Asc, + SortDirection::Desc => codex_state::SortDirection::Desc, + }, + search_term, + }; + let page = match parent_thread_id { + Some(parent_thread_id) => { + ctx.list_threads_by_parent(page_size, parent_thread_id, filters) + .await + } + None => ctx.list_threads(page_size, filters).await, + }; + match page { Ok(mut page) => { + // Parent-filtered listings intentionally treat persisted state as authoritative. + if parent_thread_id.is_some() { + return Some(page); + } let mut valid_items = Vec::with_capacity(page.items.len()); for item in page.items { if let Some(existing_path) = diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 40267f046..6cebf27e8 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -398,11 +398,32 @@ ON CONFLICT(child_thread_id) DO NOTHING &self, page_size: usize, filters: ThreadFilterOptions<'_>, + ) -> anyhow::Result { + self.list_threads_matching(page_size, filters, /*parent_thread_id*/ None) + .await + } + + /// List direct children of `parent_thread_id` using persisted spawn edges. + pub async fn list_threads_by_parent( + &self, + page_size: usize, + parent_thread_id: ThreadId, + filters: ThreadFilterOptions<'_>, + ) -> anyhow::Result { + self.list_threads_matching(page_size, filters, Some(parent_thread_id)) + .await + } + + async fn list_threads_matching( + &self, + page_size: usize, + filters: ThreadFilterOptions<'_>, + parent_thread_id: Option, ) -> anyhow::Result { let limit = page_size.saturating_add(1); let mut builder = QueryBuilder::::new(""); - push_list_threads_query(&mut builder, filters, limit); + push_list_threads_query(&mut builder, filters, parent_thread_id, limit); let rows = builder.build().fetch_all(self.pool.as_ref()).await?; let mut items = rows @@ -1022,11 +1043,19 @@ fn one_thread_id_from_rows( fn push_list_threads_query( builder: &mut QueryBuilder, filters: ThreadFilterOptions<'_>, + parent_thread_id: Option, limit: usize, ) { push_thread_select_columns(builder); builder.push(" FROM threads"); push_thread_filters(builder, filters); + if let Some(parent_thread_id) = parent_thread_id { + builder.push( + " AND threads.id IN (SELECT child_thread_id FROM thread_spawn_edges WHERE parent_thread_id = ", + ); + builder.push_bind(parent_thread_id.to_string()); + builder.push(")"); + } let order_by_index = match filters.cwd_filters { // Multi-cwd listing is supported but at the time of writing has no current use in production. // Preserve its query plan so the global timestamp index does not regress cwd filtering into a scan. @@ -1701,6 +1730,7 @@ mod tests { sort_direction: SortDirection::Desc, search_term: None, }, + /*parent_thread_id*/ None, /*limit*/ 201, ); let plan_details = builder @@ -1729,6 +1759,98 @@ mod tests { } } + #[tokio::test] + async fn list_threads_by_parent_filters_direct_children_with_keyset_pagination() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("state db should initialize"); + let parent_id = ThreadId::new(); + let first_child_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000001").expect("valid thread id"); + let second_child_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000002").expect("valid thread id"); + let grandchild_id = ThreadId::new(); + + for (thread_id, created_at) in [ + (first_child_id, 1_700_000_100), + (second_child_id, 1_700_000_200), + (grandchild_id, 1_700_000_300), + ] { + let mut metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone()); + metadata.created_at = + DateTime::::from_timestamp(created_at, 0).expect("valid timestamp"); + metadata.updated_at = metadata.created_at; + runtime + .upsert_thread(&metadata) + .await + .expect("thread insert should succeed"); + } + for (parent_thread_id, child_thread_id, status) in [ + ( + parent_id, + first_child_id, + DirectionalThreadSpawnEdgeStatus::Open, + ), + ( + parent_id, + second_child_id, + DirectionalThreadSpawnEdgeStatus::Closed, + ), + ( + first_child_id, + grandchild_id, + DirectionalThreadSpawnEdgeStatus::Open, + ), + ] { + runtime + .upsert_thread_spawn_edge(parent_thread_id, child_thread_id, status) + .await + .expect("spawn edge insert should succeed"); + } + + let filters = |anchor| ThreadFilterOptions { + archived_only: false, + allowed_sources: &[], + model_providers: None, + cwd_filters: None, + anchor, + sort_key: SortKey::CreatedAt, + sort_direction: SortDirection::Desc, + search_term: None, + }; + let first_page = runtime + .list_threads_by_parent(/*page_size*/ 1, parent_id, filters(None)) + .await + .expect("first page should succeed"); + let second_page = runtime + .list_threads_by_parent( + /*page_size*/ 1, + parent_id, + filters(first_page.next_anchor.as_ref()), + ) + .await + .expect("second page should succeed"); + + assert_eq!( + first_page + .items + .iter() + .map(|item| item.id) + .collect::>(), + vec![second_child_id] + ); + assert_eq!( + second_page + .items + .iter() + .map(|item| item.id) + .collect::>(), + vec![first_child_id] + ); + assert_eq!(second_page.next_anchor, None); + } + #[tokio::test] async fn apply_rollout_items_restores_memory_mode_from_session_meta() { let codex_home = unique_temp_dir(); diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index 472e3d35e..d02b78d23 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -48,6 +48,10 @@ mod tests { use crate::ListTurnsParams; use crate::SortDirection; use crate::StoredTurnItemsView; + use crate::ThreadPersistenceMetadata; + use crate::ThreadSortKey; + use codex_protocol::models::BaseInstructions; + use codex_protocol::protocol::SessionSource; #[tokio::test] async fn default_turn_pagination_methods_return_unsupported() { @@ -90,6 +94,68 @@ mod tests { } )); } + + #[tokio::test] + async fn list_threads_filters_by_parent_thread_id() { + let store = InMemoryThreadStore::default(); + let parent_thread_id = ThreadId::default(); + let child_thread_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000001").expect("valid thread id"); + let unrelated_thread_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000002").expect("valid thread id"); + + for (thread_id, parent_thread_id) in [ + (child_thread_id, Some(parent_thread_id)), + (unrelated_thread_id, None), + ] { + store + .create_thread(CreateThreadParams { + thread_id, + extra_config: None, + forked_from_id: None, + parent_thread_id, + source: SessionSource::Exec, + thread_source: None, + base_instructions: BaseInstructions::default(), + dynamic_tools: Vec::new(), + multi_agent_version: None, + metadata: ThreadPersistenceMetadata { + cwd: None, + model_provider: "test-provider".to_string(), + memory_mode: ThreadMemoryMode::Enabled, + }, + }) + .await + .expect("create thread"); + } + + let page = ThreadStore::list_threads( + &store, + ListThreadsParams { + page_size: 10, + cursor: None, + sort_key: ThreadSortKey::CreatedAt, + sort_direction: SortDirection::Desc, + allowed_sources: Vec::new(), + model_providers: None, + cwd_filters: None, + archived: false, + search_term: None, + parent_thread_id: Some(parent_thread_id), + use_state_db_only: false, + }, + ) + .await + .expect("list child threads"); + + assert_eq!( + page.items + .into_iter() + .map(|item| item.thread_id) + .collect::>(), + vec![child_thread_id] + ); + } } fn stores_guard() -> MutexGuard<'static, HashMap>> { @@ -385,8 +451,15 @@ impl ThreadStore for InMemoryThreadStore { )) } - fn list_threads(&self, _params: ListThreadsParams) -> ThreadStoreFuture<'_, ThreadPage> { - Box::pin(InMemoryThreadStore::list_threads(self)) + fn list_threads(&self, params: ListThreadsParams) -> ThreadStoreFuture<'_, ThreadPage> { + Box::pin(async move { + let mut page = InMemoryThreadStore::list_threads(self).await?; + if let Some(parent_thread_id) = params.parent_thread_id { + page.items + .retain(|thread| thread.parent_thread_id == Some(parent_thread_id)); + } + Ok(page) + }) } fn update_thread_metadata( diff --git a/codex-rs/thread-store/src/local/archive_thread.rs b/codex-rs/thread-store/src/local/archive_thread.rs index 8fb214e98..bbe5c3de5 100644 --- a/codex-rs/thread-store/src/local/archive_thread.rs +++ b/codex-rs/thread-store/src/local/archive_thread.rs @@ -110,6 +110,7 @@ mod tests { cwd_filters: None, archived: true, search_term: None, + parent_thread_id: None, use_state_db_only: false, }) .await diff --git a/codex-rs/thread-store/src/local/create_thread.rs b/codex-rs/thread-store/src/local/create_thread.rs index 8c630f995..05924881d 100644 --- a/codex-rs/thread-store/src/local/create_thread.rs +++ b/codex-rs/thread-store/src/local/create_thread.rs @@ -25,7 +25,7 @@ pub(super) async fn create_thread( model_provider_id: params.metadata.model_provider.clone(), generate_memories: matches!(params.metadata.memory_mode, ThreadMemoryMode::Enabled), }; - let recorder = RolloutRecorder::new( + RolloutRecorder::new( &config, RolloutRecorderParams::new( params.thread_id, @@ -41,7 +41,5 @@ pub(super) async fn create_thread( .await .map_err(|err| ThreadStoreError::Internal { message: format!("failed to initialize local thread recorder: {err}"), - })?; - - Ok(recorder) + }) } diff --git a/codex-rs/thread-store/src/local/list_threads.rs b/codex-rs/thread-store/src/local/list_threads.rs index 0a0767da0..fc3b5034a 100644 --- a/codex-rs/thread-store/src/local/list_threads.rs +++ b/codex-rs/thread-store/src/local/list_threads.rs @@ -116,6 +116,32 @@ pub(super) async fn list_rollout_threads( sort_key: codex_rollout::ThreadSortKey, sort_direction: codex_rollout::SortDirection, ) -> ThreadStoreResult { + if let Some(parent_thread_id) = params.parent_thread_id { + let page = codex_rollout::state_db::list_threads_db( + state_db.as_deref(), + config.codex_home.as_path(), + params.page_size, + cursor, + sort_key, + sort_direction, + params.allowed_sources.as_slice(), + params.model_providers.as_deref(), + params.cwd_filters.as_deref(), + Some(parent_thread_id), + params.archived, + params.search_term.as_deref(), + ) + .await + .ok_or_else(|| ThreadStoreError::Internal { + message: "state DB unavailable for parent-filtered thread listing".to_string(), + })?; + let mut page: codex_rollout::ThreadsPage = page.into(); + for item in &mut page.items { + item.parent_thread_id = Some(parent_thread_id); + } + return Ok(page); + } + let page = if params.use_state_db_only && params.archived { RolloutRecorder::list_archived_threads_from_state_db( state_db, @@ -225,6 +251,7 @@ mod tests { cwd_filters: None, archived: false, search_term: None, + parent_thread_id: None, use_state_db_only: false, }) .await @@ -284,6 +311,7 @@ mod tests { cwd_filters: None, archived: false, search_term: Some("needle".to_string()), + parent_thread_id: None, use_state_db_only: true, }) .await @@ -323,6 +351,7 @@ mod tests { cwd_filters: None, archived: false, search_term: None, + parent_thread_id: None, use_state_db_only: false, }) .await @@ -338,6 +367,7 @@ mod tests { cwd_filters: None, archived: true, search_term: None, + parent_thread_id: None, use_state_db_only: false, }) .await @@ -389,6 +419,7 @@ mod tests { cwd_filters: None, archived: false, search_term: None, + parent_thread_id: None, use_state_db_only: false, }) .await @@ -425,6 +456,7 @@ mod tests { cwd_filters: None, archived: false, search_term: None, + parent_thread_id: None, use_state_db_only: false, }) .await diff --git a/codex-rs/thread-store/src/local/search_threads.rs b/codex-rs/thread-store/src/local/search_threads.rs index c9394c396..1b7f18a32 100644 --- a/codex-rs/thread-store/src/local/search_threads.rs +++ b/codex-rs/thread-store/src/local/search_threads.rs @@ -93,6 +93,7 @@ pub(super) async fn search_threads( cwd_filters: None, archived: params.archived, search_term: None, + parent_thread_id: None, use_state_db_only: state_db.is_some(), }; let mut remaining_rollouts = matching_rollouts; diff --git a/codex-rs/thread-store/src/local/update_thread_metadata.rs b/codex-rs/thread-store/src/local/update_thread_metadata.rs index 7754b1a84..9bde79916 100644 --- a/codex-rs/thread-store/src/local/update_thread_metadata.rs +++ b/codex-rs/thread-store/src/local/update_thread_metadata.rs @@ -1476,6 +1476,7 @@ mod tests { cwd_filters: Some(vec![workspace]), archived: false, search_term: None, + parent_thread_id: None, use_state_db_only: true, }) .await diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 741ca12bc..bb8bf2f6d 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -195,6 +195,8 @@ pub struct ListThreadsParams { pub archived: bool, /// Optional substring/full-text search term for thread title/preview. pub search_term: Option, + /// Optional direct parent thread filter. + pub parent_thread_id: Option, /// Return directly from the state DB without scanning JSONL rollouts to repair metadata. pub use_state_db_only: bool, } diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 80259d892..3ec8bef7c 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -623,6 +623,7 @@ async fn lookup_session_target_by_name_with_app_server( model_providers: None, source_kinds: Some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]), archived: Some(false), + parent_thread_id: None, cwd: None, use_state_db_only: false, search_term: Some(name.to_string()), @@ -735,6 +736,7 @@ fn latest_session_lookup_params( }, source_kinds: Some(resume_source_kinds(include_non_interactive)), archived: Some(false), + parent_thread_id: None, cwd: cwd_filter.map(|cwd| ThreadListCwdFilter::One(cwd.to_string_lossy().to_string())), use_state_db_only: match lookup_mode { LatestSessionLookupMode::StateDbOnly => true, diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index c736fe9b6..18313bcca 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -1821,6 +1821,7 @@ fn thread_list_params( }, source_kinds: Some(crate::resume_source_kinds(include_non_interactive)), archived: Some(false), + parent_thread_id: None, cwd: cwd_filter.map(|cwd| ThreadListCwdFilter::One(cwd.to_string_lossy().into_owned())), use_state_db_only: false, search_term: None, diff --git a/codex-rs/tui/src/session_archive_commands.rs b/codex-rs/tui/src/session_archive_commands.rs index 2867d1933..852aaad5c 100644 --- a/codex-rs/tui/src/session_archive_commands.rs +++ b/codex-rs/tui/src/session_archive_commands.rs @@ -178,6 +178,7 @@ async fn lookup_session_by_exact_name( /*include_non_interactive*/ false, )), archived: Some(archived), + parent_thread_id: None, cwd: None, use_state_db_only: false, search_term: search_term.map(str::to_string),