From ece1dfece07650458f84f3d2b35ffbde29ae48b7 Mon Sep 17 00:00:00 2001 From: Tom Date: Wed, 24 Jun 2026 13:24:10 -0700 Subject: [PATCH] [codex] Inject agent graph store into ThreadManager (#29736) Pick up the AgentGraphStore migration. - Inject an explicit optional agent graph store into `ThreadManager` - Move all calls to spawn, close, recursive resume, and subtree/archive/delete/feedback traversal through it - Keep using `LocalAgentGraphStore` when SQLite is available This required some changes to the interface to deal with futures: - The interface now matches `ThreadStore`'s object-safe pattern by returning a boxed `AgentGraphStoreFuture` directly, allowing `ThreadManager` to hold `Arc` *Slight behavior change!* Unfiltered subtree enumeration now performs a single all-status breadth-first traversal, so a closed grandchild beneath an open edge is included; the previous Open-then-Closed traversals could not cross mixed-status paths and silently omitted it. --- codex-rs/Cargo.lock | 1 + codex-rs/Cargo.toml | 1 - codex-rs/agent-graph-store/Cargo.toml | 2 +- codex-rs/agent-graph-store/src/lib.rs | 1 + codex-rs/agent-graph-store/src/local.rs | 98 +++++++++------- codex-rs/agent-graph-store/src/store.rs | 15 ++- codex-rs/app-server/src/mcp_refresh.rs | 2 +- codex-rs/app-server/src/message_processor.rs | 2 +- .../request_processors/feedback_processor.rs | 22 +--- .../src/request_processors/thread_delete.rs | 19 +--- .../request_processors/thread_processor.rs | 17 +-- codex-rs/core-api/src/lib.rs | 1 + codex-rs/core/Cargo.toml | 1 + codex-rs/core/src/agent/control.rs | 15 ++- codex-rs/core/src/agent/control/legacy.rs | 13 ++- codex-rs/core/src/agent/control/spawn.rs | 49 ++++---- codex-rs/core/src/agent/control_tests.rs | 86 ++++++++++---- codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/prompt_debug.rs | 2 +- codex-rs/core/src/thread_manager.rs | 50 +++++---- codex-rs/core/src/thread_manager_tests.rs | 106 ++++++++++++++++-- .../src/tools/handlers/multi_agents_tests.rs | 3 +- codex-rs/core/tests/common/test_codex.rs | 2 +- codex-rs/core/tests/suite/client.rs | 2 +- codex-rs/mcp-server/src/message_processor.rs | 2 +- codex-rs/thread-manager-sample/src/main.rs | 3 +- 26 files changed, 317 insertions(+), 199 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index f34161c69..7796d4854 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2615,6 +2615,7 @@ dependencies = [ "bm25", "chrono", "clap", + "codex-agent-graph-store", "codex-analytics", "codex-api", "codex-app-server-protocol", diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 2bc8ac340..abf5e0411 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -504,7 +504,6 @@ unwrap_used = "deny" # silence the false positive here instead of deleting a real dependency. [workspace.metadata.cargo-shear] ignored = [ - "codex-agent-graph-store", "icu_provider", "openssl-sys", "codex-v8-poc", diff --git a/codex-rs/agent-graph-store/Cargo.toml b/codex-rs/agent-graph-store/Cargo.toml index f676959ad..45bcb84c4 100644 --- a/codex-rs/agent-graph-store/Cargo.toml +++ b/codex-rs/agent-graph-store/Cargo.toml @@ -22,4 +22,4 @@ thiserror = { workspace = true } pretty_assertions = { workspace = true } serde_json = { workspace = true } tempfile = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] } diff --git a/codex-rs/agent-graph-store/src/lib.rs b/codex-rs/agent-graph-store/src/lib.rs index 72e8b45e8..d5f40331b 100644 --- a/codex-rs/agent-graph-store/src/lib.rs +++ b/codex-rs/agent-graph-store/src/lib.rs @@ -9,4 +9,5 @@ pub use error::AgentGraphStoreError; pub use error::AgentGraphStoreResult; pub use local::LocalAgentGraphStore; pub use store::AgentGraphStore; +pub use store::AgentGraphStoreFuture; pub use types::ThreadSpawnEdgeStatus; diff --git a/codex-rs/agent-graph-store/src/local.rs b/codex-rs/agent-graph-store/src/local.rs index b90aaf4f4..56cb1f4ac 100644 --- a/codex-rs/agent-graph-store/src/local.rs +++ b/codex-rs/agent-graph-store/src/local.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::AgentGraphStore; use crate::AgentGraphStoreError; -use crate::AgentGraphStoreResult; +use crate::AgentGraphStoreFuture; use crate::ThreadSpawnEdgeStatus; /// SQLite-backed implementation of [`AgentGraphStore`] using an existing state runtime. @@ -29,65 +29,83 @@ impl LocalAgentGraphStore { } impl AgentGraphStore for LocalAgentGraphStore { - async fn upsert_thread_spawn_edge( + fn upsert_thread_spawn_edge( &self, parent_thread_id: ThreadId, child_thread_id: ThreadId, status: ThreadSpawnEdgeStatus, - ) -> AgentGraphStoreResult<()> { - self.state_db - .upsert_thread_spawn_edge(parent_thread_id, child_thread_id, to_state_status(status)) - .await - .map_err(internal_error) + ) -> AgentGraphStoreFuture<'_, ()> { + Box::pin(async move { + self.state_db + .upsert_thread_spawn_edge( + parent_thread_id, + child_thread_id, + to_state_status(status), + ) + .await + .map_err(internal_error) + }) } - async fn set_thread_spawn_edge_status( + fn set_thread_spawn_edge_status( &self, child_thread_id: ThreadId, status: ThreadSpawnEdgeStatus, - ) -> AgentGraphStoreResult<()> { - self.state_db - .set_thread_spawn_edge_status(child_thread_id, to_state_status(status)) - .await - .map_err(internal_error) + ) -> AgentGraphStoreFuture<'_, ()> { + Box::pin(async move { + self.state_db + .set_thread_spawn_edge_status(child_thread_id, to_state_status(status)) + .await + .map_err(internal_error) + }) } - async fn list_thread_spawn_children( + fn list_thread_spawn_children( &self, parent_thread_id: ThreadId, status_filter: Option, - ) -> AgentGraphStoreResult> { - if let Some(status) = status_filter { - return self - .state_db - .list_thread_spawn_children_with_status(parent_thread_id, to_state_status(status)) - .await - .map_err(internal_error); - } + ) -> AgentGraphStoreFuture<'_, Vec> { + Box::pin(async move { + if let Some(status) = status_filter { + return self + .state_db + .list_thread_spawn_children_with_status( + parent_thread_id, + to_state_status(status), + ) + .await + .map_err(internal_error); + } - self.state_db - .list_thread_spawn_children(parent_thread_id) - .await - .map_err(internal_error) + self.state_db + .list_thread_spawn_children(parent_thread_id) + .await + .map_err(internal_error) + }) } - async fn list_thread_spawn_descendants( + fn list_thread_spawn_descendants( &self, root_thread_id: ThreadId, status_filter: Option, - ) -> AgentGraphStoreResult> { - match status_filter { - Some(status) => self - .state_db - .list_thread_spawn_descendants_with_status(root_thread_id, to_state_status(status)) - .await - .map_err(internal_error), - None => self - .state_db - .list_thread_spawn_descendants(root_thread_id) - .await - .map_err(internal_error), - } + ) -> AgentGraphStoreFuture<'_, Vec> { + Box::pin(async move { + match status_filter { + Some(status) => self + .state_db + .list_thread_spawn_descendants_with_status( + root_thread_id, + to_state_status(status), + ) + .await + .map_err(internal_error), + None => self + .state_db + .list_thread_spawn_descendants(root_thread_id) + .await + .map_err(internal_error), + } + }) } } diff --git a/codex-rs/agent-graph-store/src/store.rs b/codex-rs/agent-graph-store/src/store.rs index 2e0ba2073..0760cb15d 100644 --- a/codex-rs/agent-graph-store/src/store.rs +++ b/codex-rs/agent-graph-store/src/store.rs @@ -1,8 +1,15 @@ +use std::future::Future; +use std::pin::Pin; + use codex_protocol::ThreadId; use crate::AgentGraphStoreResult; use crate::ThreadSpawnEdgeStatus; +/// Future returned by [`AgentGraphStore`] operations. +pub type AgentGraphStoreFuture<'a, T> = + Pin> + Send + 'a>>; + /// Storage-neutral boundary for persisted thread-spawn parent/child topology. /// /// Implementations are expected to return stable ordering for list methods so callers can merge @@ -17,7 +24,7 @@ pub trait AgentGraphStore: Send + Sync { parent_thread_id: ThreadId, child_thread_id: ThreadId, status: ThreadSpawnEdgeStatus, - ) -> impl std::future::Future> + Send; + ) -> AgentGraphStoreFuture<'_, ()>; /// Update the persisted lifecycle status of a spawned thread's incoming edge. /// @@ -26,7 +33,7 @@ pub trait AgentGraphStore: Send + Sync { &self, child_thread_id: ThreadId, status: ThreadSpawnEdgeStatus, - ) -> impl std::future::Future> + Send; + ) -> AgentGraphStoreFuture<'_, ()>; /// List direct spawned children of a parent thread. /// @@ -37,7 +44,7 @@ pub trait AgentGraphStore: Send + Sync { &self, parent_thread_id: ThreadId, status_filter: Option, - ) -> impl std::future::Future>> + Send; + ) -> AgentGraphStoreFuture<'_, Vec>; /// List spawned descendants breadth-first by depth, then by thread id. /// @@ -49,5 +56,5 @@ pub trait AgentGraphStore: Send + Sync { &self, root_thread_id: ThreadId, status_filter: Option, - ) -> impl std::future::Future>> + Send; + ) -> AgentGraphStoreFuture<'_, Vec>; } diff --git a/codex-rs/app-server/src/mcp_refresh.rs b/codex-rs/app-server/src/mcp_refresh.rs index 84a6b8e83..848d201f2 100644 --- a/codex-rs/app-server/src/mcp_refresh.rs +++ b/codex-rs/app-server/src/mcp_refresh.rs @@ -247,7 +247,7 @@ mod tests { )), /*analytics_events_client*/ None, Arc::clone(&thread_store), - Some(state_db.clone()), + codex_core::local_agent_graph_store_from_state_db(Some(&state_db)), "11111111-1111-4111-8111-111111111111".to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 7c9f006d2..41ecbfe56 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -373,7 +373,7 @@ impl MessageProcessor { )), Some(analytics_events_client.clone()), Arc::clone(&thread_store), - state_db.clone(), + codex_core::local_agent_graph_store_from_state_db(state_db.as_ref()), installation_id, Some(app_server_attestation_provider( outgoing.clone(), diff --git a/codex-rs/app-server/src/request_processors/feedback_processor.rs b/codex-rs/app-server/src/request_processors/feedback_processor.rs index 3f2c016d5..5f488f72c 100644 --- a/codex-rs/app-server/src/request_processors/feedback_processor.rs +++ b/codex-rs/app-server/src/request_processors/feedback_processor.rs @@ -102,27 +102,7 @@ impl FeedbackRequestProcessor { warn!( "failed to list feedback subtree for thread_id={conversation_id}: {err}" ); - let mut thread_ids = vec![conversation_id]; - if let Some(state_db_ctx) = state_db_ctx.as_ref() { - for status in [ - codex_state::DirectionalThreadSpawnEdgeStatus::Open, - codex_state::DirectionalThreadSpawnEdgeStatus::Closed, - ] { - match state_db_ctx - .list_thread_spawn_descendants_with_status( - conversation_id, - status, - ) - .await - { - Ok(descendant_ids) => thread_ids.extend(descendant_ids), - Err(err) => warn!( - "failed to list persisted feedback subtree for thread_id={conversation_id}: {err}" - ), - } - } - } - thread_ids + vec![conversation_id] } }, None => Vec::new(), diff --git a/codex-rs/app-server/src/request_processors/thread_delete.rs b/codex-rs/app-server/src/request_processors/thread_delete.rs index fc7e58573..ae78b1f4b 100644 --- a/codex-rs/app-server/src/request_processors/thread_delete.rs +++ b/codex-rs/app-server/src/request_processors/thread_delete.rs @@ -1,6 +1,5 @@ //! `thread/delete` request handling. -use super::thread_processor::core_thread_write_error; use super::thread_processor::unsupported_thread_store_operation; use super::*; @@ -37,23 +36,7 @@ impl ThreadRequestProcessor { let thread_id = ThreadId::from_string(¶ms.thread_id) .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; - let mut thread_ids = self.state_db_spawn_subtree_thread_ids(thread_id).await?; - let mut seen = thread_ids.iter().copied().collect::>(); - - match self - .thread_manager - .list_agent_subtree_thread_ids(thread_id) - .await - { - Ok(live_thread_ids) => { - for live_thread_id in live_thread_ids { - if seen.insert(live_thread_id) { - thread_ids.push(live_thread_id); - } - } - } - Err(err) => return Err(core_thread_write_error("delete thread", err)), - } + let thread_ids = self.state_db_spawn_subtree_thread_ids(thread_id).await?; self.validate_root_thread_delete(thread_id, thread_ids.len() > 1) .await?; diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index acfa701c4..4703518f7 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -1431,25 +1431,14 @@ impl ThreadRequestProcessor { &self, thread_id: ThreadId, ) -> Result, JSONRPCErrorError> { - let mut thread_ids = vec![thread_id]; - let Some(state_db_ctx) = self.state_db.as_ref() else { - return Ok(thread_ids); - }; - let mut seen = HashSet::from([thread_id]); - let descendants = state_db_ctx - .list_thread_spawn_descendants(thread_id) + self.thread_manager + .list_agent_subtree_thread_ids(thread_id) .await .map_err(|err| { internal_error(format!( "failed to list spawned descendants for thread id {thread_id}: {err}" )) - })?; - for descendant_id in descendants { - if seen.insert(descendant_id) { - thread_ids.push(descendant_id); - } - } - Ok(thread_ids) + }) } async fn thread_increment_elicitation_inner( diff --git a/codex-rs/core-api/src/lib.rs b/codex-rs/core-api/src/lib.rs index de3a7de90..c7cfff8a9 100644 --- a/codex-rs/core-api/src/lib.rs +++ b/codex-rs/core-api/src/lib.rs @@ -44,6 +44,7 @@ pub use codex_core::config::TerminalResizeReflowConfig; pub use codex_core::config::ThreadStoreConfig; pub use codex_core::config::find_codex_home; pub use codex_core::init_state_db; +pub use codex_core::local_agent_graph_store_from_state_db; pub use codex_core::resolve_installation_id; pub use codex_core::skills::SkillsService; pub use codex_core::thread_store_from_config; diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index d16e8d9cb..4435a2eb6 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -24,6 +24,7 @@ bm25 = { workspace = true } chrono = { workspace = true, features = ["serde"] } clap = { workspace = true, features = ["derive"] } codex-analytics = { workspace = true } +codex-agent-graph-store = { workspace = true } codex-api = { workspace = true } codex-app-server-protocol = { workspace = true } codex-apply-patch = { workspace = true } diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 2f8d03543..9b010f7cd 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -36,7 +36,6 @@ use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::user_input::UserInput; -use codex_state::DirectionalThreadSpawnEdgeStatus; use codex_thread_store::ReadThreadParams; use serde::Serialize; use std::collections::HashMap; @@ -639,7 +638,7 @@ impl AgentControl { async fn persist_thread_spawn_edge_for_source( &self, - thread: &crate::CodexThread, + child_thread: &crate::CodexThread, child_thread_id: ThreadId, session_source: Option<&SessionSource>, ) { @@ -647,14 +646,20 @@ impl AgentControl { else { return; }; - let Some(state_db_ctx) = thread.state_db() else { + if child_thread.config_snapshot().await.ephemeral { + return; + } + let Ok(state) = self.upgrade() else { return; }; - if let Err(err) = state_db_ctx + let Some(agent_graph_store) = state.agent_graph_store() else { + return; + }; + if let Err(err) = agent_graph_store .upsert_thread_spawn_edge( parent_thread_id, child_thread_id, - DirectionalThreadSpawnEdgeStatus::Open, + codex_agent_graph_store::ThreadSpawnEdgeStatus::Open, ) .await { diff --git a/codex-rs/core/src/agent/control/legacy.rs b/codex-rs/core/src/agent/control/legacy.rs index aa4147268..c6c6a6c38 100644 --- a/codex-rs/core/src/agent/control/legacy.rs +++ b/codex-rs/core/src/agent/control/legacy.rs @@ -31,11 +31,12 @@ impl AgentControl { let known_agent = self.state.agent_metadata_for_thread(agent_id).is_some(); match state.get_thread(agent_id).await { Ok(thread) => { - if let Some(state_db_ctx) = thread.state_db() - && let Err(err) = state_db_ctx + if !thread.config_snapshot().await.ephemeral + && let Some(agent_graph_store) = state.agent_graph_store() + && let Err(err) = agent_graph_store .set_thread_spawn_edge_status( agent_id, - DirectionalThreadSpawnEdgeStatus::Closed, + codex_agent_graph_store::ThreadSpawnEdgeStatus::Closed, ) .await { @@ -43,11 +44,11 @@ impl AgentControl { } } Err(CodexErr::ThreadNotFound(_)) if known_agent => { - if let Some(state_db_ctx) = state.state_db() - && let Err(err) = state_db_ctx + if let Some(agent_graph_store) = state.agent_graph_store() + && let Err(err) = agent_graph_store .set_thread_spawn_edge_status( agent_id, - DirectionalThreadSpawnEdgeStatus::Closed, + codex_agent_graph_store::ThreadSpawnEdgeStatus::Closed, ) .await { diff --git a/codex-rs/core/src/agent/control/spawn.rs b/codex-rs/core/src/agent/control/spawn.rs index cdb3d352a..ad2f00bf2 100644 --- a/codex-rs/core/src/agent/control/spawn.rs +++ b/codex-rs/core/src/agent/control/spawn.rs @@ -545,19 +545,16 @@ impl AgentControl { { return Ok(resumed_thread_id); } - let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else { - return Ok(resumed_thread_id); - }; - let Some(state_db_ctx) = resumed_thread.state_db() else { + let Some(agent_graph_store) = state.agent_graph_store() else { return Ok(resumed_thread_id); }; let mut resume_queue = VecDeque::from([(thread_id, root_depth)]); while let Some((parent_thread_id, parent_depth)) = resume_queue.pop_front() { - let child_ids = match state_db_ctx - .list_thread_spawn_children_with_status( + let child_ids = match agent_graph_store + .list_thread_spawn_children( parent_thread_id, - DirectionalThreadSpawnEdgeStatus::Open, + Some(codex_agent_graph_store::ThreadSpawnEdgeStatus::Open), ) .await { @@ -613,7 +610,6 @@ impl AgentControl { session_source: SessionSource, ) -> CodexResult<(ThreadId, MultiAgentVersion)> { let state = self.upgrade()?; - let state_db_ctx = state.state_db(); let stored_thread = state .read_stored_thread(ReadThreadParams { thread_id, @@ -621,6 +617,14 @@ impl AgentControl { include_history: true, }) .await?; + let resumed_agent_path = stored_thread + .agent_path + .as_deref() + .map(AgentPath::try_from) + .transpose() + .map_err(|err| CodexErr::InvalidRequest(format!("invalid stored agent path: {err}")))?; + let resumed_agent_nickname = stored_thread.agent_nickname.clone(); + let resumed_agent_role = stored_thread.agent_role.clone(); let history = stored_thread .history .ok_or_else(|| CodexErr::ThreadNotFound(thread_id))? @@ -649,26 +653,15 @@ impl AgentControl { agent_path, agent_role: _, agent_nickname: _, - }) => { - let (resumed_agent_nickname, resumed_agent_role) = - if let Some(state_db_ctx) = state_db_ctx.as_ref() { - match state_db_ctx.get_thread(thread_id).await { - Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role), - Ok(None) | Err(_) => (None, None), - } - } else { - (None, None) - }; - self.prepare_thread_spawn( - &mut reservation, - &config, - parent_thread_id, - depth, - agent_path, - resumed_agent_role, - resumed_agent_nickname, - )? - } + }) => self.prepare_thread_spawn( + &mut reservation, + &config, + parent_thread_id, + depth, + agent_path.or(resumed_agent_path), + resumed_agent_role, + resumed_agent_nickname, + )?, other => (other, AgentMetadata::default()), }; let notification_source = session_source.clone(); diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index 05b824253..8edc0ee61 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -12,7 +12,9 @@ use crate::init_state_db; use crate::thread_manager::StartThreadOptions; use assert_matches::assert_matches; use codex_extension_api::ExtensionDataInit; +use codex_extension_api::empty_extension_registry; use codex_features::Feature; +use codex_login::AuthManager; use codex_login::CodexAuth; use codex_protocol::AgentPath; use codex_protocol::config_types::ModeKind; @@ -33,6 +35,7 @@ use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use codex_thread_store::ArchiveThreadParams; +use codex_thread_store::InMemoryThreadStore; use codex_thread_store::LocalThreadStore; use codex_thread_store::LocalThreadStoreConfig; use codex_thread_store::ThreadStore; @@ -849,6 +852,42 @@ async fn spawn_agent_creates_thread_and_sends_prompt() { assert_eq!(captured, Some(expected)); } +#[tokio::test] +async fn ephemeral_spawn_does_not_persist_agent_graph_edge() { + let (home, mut config) = test_config().await; + config.ephemeral = true; + let harness = AgentControlHarness::new_with_config(home, config).await; + let (parent_thread_id, _parent_thread) = harness.start_thread().await; + let child_thread_id = harness + .control + .spawn_agent( + harness.config.clone(), + text_input("spawned"), + Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id, + depth: 1, + agent_path: None, + agent_nickname: None, + agent_role: None, + })), + ) + .await + .expect("ephemeral agent spawn should succeed"); + + let persisted_children = harness + .state_db + .as_ref() + .expect("manager should retain state db") + .list_thread_spawn_children(parent_thread_id) + .await + .expect("persisted child list should load"); + assert_eq!(persisted_children, Vec::::new()); + assert!( + harness.manager.get_thread(child_thread_id).await.is_ok(), + "ephemeral child should remain live" + ); +} + #[tokio::test] async fn spawn_thread_subagent_uses_supplied_initial_multi_agent_mode_without_history() { let harness = AgentControlHarness::new().await; @@ -2354,24 +2393,27 @@ async fn spawn_thread_subagent_uses_role_specific_nickname_candidates() { #[tokio::test] async fn resume_thread_subagent_restores_stored_metadata_and_effective_multi_agent_mode() { - let (home, mut config) = test_config().await; - config - .features - .enable(Feature::Sqlite) - .expect("test config should allow sqlite"); - let state_db = init_state_db(&config).await; - let manager = ThreadManager::with_models_provider_home_and_state_for_tests( - CodexAuth::from_api_key("dummy"), - config.model_provider.clone(), - config.codex_home.to_path_buf(), - std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), - state_db.clone(), + let (home, config) = test_config().await; + let thread_store = Arc::new(InMemoryThreadStore::default()); + let manager = ThreadManager::new( + &config, + AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")), + SessionSource::Exec, + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + empty_extension_registry(), + Arc::new(crate::test_support::EmptyUserInstructionsProvider), + /*analytics_events_client*/ None, + thread_store.clone(), + /*agent_graph_store*/ None, + uuid::Uuid::new_v4().to_string(), + /*attestation_provider*/ None, + /*external_time_provider*/ None, ); let control = manager.agent_control(); let harness = AgentControlHarness { _home: home, config, - state_db, + state_db: None, manager, control, }; @@ -2457,14 +2499,18 @@ async fn resume_thread_subagent_restores_stored_metadata_and_effective_multi_age .session_source .get_nickname() .expect("spawned sub-agent should have a nickname"); - let state_db = child_thread - .state_db() - .expect("sqlite state db should be available for nickname resume test"); timeout(Duration::from_secs(5), async { loop { - if let Ok(Some(metadata)) = state_db.get_thread(child_thread_id).await - && metadata.agent_nickname.is_some() - && metadata.agent_role.as_deref() == Some("explorer") + if let Ok(stored_thread) = thread_store + .read_thread(ReadThreadParams { + thread_id: child_thread_id, + include_archived: true, + include_history: false, + }) + .await + && stored_thread.agent_nickname.is_some() + && stored_thread.agent_role.as_deref() == Some("explorer") + && stored_thread.agent_path.as_deref() == Some(agent_path.as_str()) { break; } @@ -2488,7 +2534,7 @@ async fn resume_thread_subagent_restores_stored_metadata_and_effective_multi_age SessionSource::SubAgent(SubAgentSource::ThreadSpawn { parent_thread_id, depth: 1, - agent_path: Some(agent_path.clone()), + agent_path: None, agent_nickname: None, agent_role: None, }), diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 519d11d1c..da9195406 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -118,6 +118,7 @@ pub use thread_manager::StartThreadOptions; pub use thread_manager::ThreadManager; pub use thread_manager::ThreadShutdownReport; pub use thread_manager::build_models_manager; +pub use thread_manager::local_agent_graph_store_from_state_db; pub use thread_manager::thread_store_from_config; pub use web_search::web_search_action_detail; pub use web_search::web_search_detail; diff --git a/codex-rs/core/src/prompt_debug.rs b/codex-rs/core/src/prompt_debug.rs index a3d1cfaf0..f5941bc10 100644 --- a/codex-rs/core/src/prompt_debug.rs +++ b/codex-rs/core/src/prompt_debug.rs @@ -58,7 +58,7 @@ pub async fn build_prompt_input( user_instructions_provider, /*analytics_events_client*/ None, thread_store, - state_db.clone(), + crate::local_agent_graph_store_from_state_db(state_db.as_ref()), installation_id, /*attestation_provider*/ None, /*external_time_provider*/ None, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 75a02d999..4846bfd89 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -16,6 +16,8 @@ use crate::session::INITIAL_SUBMIT_ID; use crate::session::resolve_multi_agent_version; use crate::tasks::InterruptedTurnHistoryMarker; use crate::tasks::interrupted_turn_history_marker; +use codex_agent_graph_store::AgentGraphStore; +use codex_agent_graph_store::LocalAgentGraphStore; use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::TurnStatus; @@ -58,7 +60,6 @@ use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_rollout::state_db::StateDbHandle; -use codex_state::DirectionalThreadSpawnEdgeStatus; use codex_thread_store::InMemoryThreadStore; use codex_thread_store::LocalThreadStore; use codex_thread_store::LocalThreadStoreConfig; @@ -241,12 +242,12 @@ pub(crate) struct ThreadManagerState { extensions: Arc>, user_instructions_provider: Arc, thread_store: Arc, + agent_graph_store: Option>, attestation_provider: Option>, external_time_provider: Option>, session_source: SessionSource, installation_id: String, analytics_events_client: Option, - state_db: Option, // Captures submitted ops for testing purpose when test mode is enabled. ops_log: Option, } @@ -283,6 +284,15 @@ pub fn thread_store_from_config( } } +/// Construct the default SQLite-backed agent graph store when local state is available. +pub fn local_agent_graph_store_from_state_db( + state_db: Option<&StateDbHandle>, +) -> Option> { + state_db.map(|state_db| { + Arc::new(LocalAgentGraphStore::new(Arc::clone(state_db))) as Arc + }) +} + impl ThreadManager { #[allow(clippy::too_many_arguments)] pub fn new( @@ -294,7 +304,7 @@ impl ThreadManager { user_instructions_provider: Arc, analytics_events_client: Option, thread_store: Arc, - state_db: Option, + agent_graph_store: Option>, installation_id: String, attestation_provider: Option>, external_time_provider: Option>, @@ -328,13 +338,13 @@ impl ThreadManager { extensions, user_instructions_provider, thread_store, + agent_graph_store, attestation_provider, external_time_provider, auth_manager, session_source, installation_id, analytics_events_client, - state_db, ops_log: should_use_test_thread_manager_behavior() .then(|| Arc::new(std::sync::Mutex::new(Vec::new()))), }), @@ -419,6 +429,7 @@ impl ThreadManager { }, state_db.clone(), )); + let agent_graph_store = local_agent_graph_store_from_state_db(state_db.as_ref()); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), @@ -434,13 +445,13 @@ impl ThreadManager { crate::test_support::EmptyUserInstructionsProvider, ), thread_store, + agent_graph_store, attestation_provider: None, external_time_provider: None, auth_manager, session_source: SessionSource::Exec, installation_id, analytics_events_client: None, - state_db, ops_log: should_use_test_thread_manager_behavior() .then(|| Arc::new(std::sync::Mutex::new(Vec::new()))), }), @@ -579,21 +590,16 @@ impl ThreadManager { subtree_thread_ids.push(thread_id); seen_thread_ids.insert(thread_id); - if let Some(state_db_ctx) = self.state.state_db() { - for status in [ - DirectionalThreadSpawnEdgeStatus::Open, - DirectionalThreadSpawnEdgeStatus::Closed, - ] { - for descendant_id in state_db_ctx - .list_thread_spawn_descendants_with_status(thread_id, status) - .await - .map_err(|err| { - CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}")) - })? - { - if seen_thread_ids.insert(descendant_id) { - subtree_thread_ids.push(descendant_id); - } + if let Some(agent_graph_store) = self.state.agent_graph_store() { + for descendant_id in agent_graph_store + .list_thread_spawn_descendants(thread_id, /*status_filter*/ None) + .await + .map_err(|err| { + CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}")) + })? + { + if seen_thread_ids.insert(descendant_id) { + subtree_thread_ids.push(descendant_id); } } } @@ -1062,8 +1068,8 @@ impl ThreadManager { } impl ThreadManagerState { - pub(crate) fn state_db(&self) -> Option { - self.state_db.clone() + pub(crate) fn agent_graph_store(&self) -> Option> { + self.agent_graph_store.clone() } pub(crate) async fn list_thread_ids(&self) -> Vec { diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index e6b222f6b..b1f5c4e24 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -35,6 +35,49 @@ use wiremock::MockServer; const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111"; +struct FakeAgentGraphStore { + root_thread_id: ThreadId, + descendant_thread_ids: Vec, +} + +impl codex_agent_graph_store::AgentGraphStore for FakeAgentGraphStore { + fn upsert_thread_spawn_edge( + &self, + _parent_thread_id: ThreadId, + _child_thread_id: ThreadId, + _status: codex_agent_graph_store::ThreadSpawnEdgeStatus, + ) -> codex_agent_graph_store::AgentGraphStoreFuture<'_, ()> { + Box::pin(async { panic!("unexpected graph upsert") }) + } + + fn set_thread_spawn_edge_status( + &self, + _child_thread_id: ThreadId, + _status: codex_agent_graph_store::ThreadSpawnEdgeStatus, + ) -> codex_agent_graph_store::AgentGraphStoreFuture<'_, ()> { + Box::pin(async { panic!("unexpected graph status update") }) + } + + fn list_thread_spawn_children( + &self, + _parent_thread_id: ThreadId, + _status_filter: Option, + ) -> codex_agent_graph_store::AgentGraphStoreFuture<'_, Vec> { + Box::pin(async { panic!("unexpected direct-child listing") }) + } + + fn list_thread_spawn_descendants( + &self, + root_thread_id: ThreadId, + status_filter: Option, + ) -> codex_agent_graph_store::AgentGraphStoreFuture<'_, Vec> { + assert_eq!(root_thread_id, self.root_thread_id); + assert_eq!(status_filter, None); + let descendant_thread_ids = self.descendant_thread_ids.clone(); + Box::pin(async move { Ok(descendant_thread_ids) }) + } +} + fn user_msg(text: &str) -> ResponseItem { ResponseItem::Message { id: None, @@ -477,7 +520,7 @@ async fn start_thread_seeds_extension_data_for_mcp_and_lifecycle_contributors() Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store_from_config(&config, /*state_db*/ None), - /*state_db*/ None, + /*agent_graph_store*/ None, TEST_INSTALLATION_ID.to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, @@ -588,7 +631,7 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() { Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store_from_config(&config, /*state_db*/ None), - /*state_db*/ None, + /*agent_graph_store*/ None, TEST_INSTALLATION_ID.to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, @@ -713,7 +756,7 @@ async fn explicit_installation_id_skips_codex_home_file() { Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store, - state_db.clone(), + local_agent_graph_store_from_state_db(state_db.as_ref()), installation_id.clone(), /*attestation_provider*/ None, /*external_time_provider*/ None, @@ -754,7 +797,7 @@ async fn resume_active_thread_from_rollout_returns_running_thread() { Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store_from_config(&config, /*state_db*/ None), - /*state_db*/ None, + /*agent_graph_store*/ None, TEST_INSTALLATION_ID.to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, @@ -814,7 +857,7 @@ async fn resume_stopped_thread_from_rollout_spawns_new_thread() { Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store_from_config(&config, /*state_db*/ None), - /*state_db*/ None, + /*agent_graph_store*/ None, TEST_INSTALLATION_ID.to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, @@ -881,7 +924,7 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() { Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store, - state_db.clone(), + local_agent_graph_store_from_state_db(state_db.as_ref()), TEST_INSTALLATION_ID.to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, @@ -948,6 +991,47 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() { .expect("shutdown resumed thread"); } +#[tokio::test] +async fn subtree_listing_uses_injected_graph_store_without_state_db() { + let temp_dir = tempdir().expect("tempdir"); + let mut config = test_config().await; + config.codex_home = temp_dir.path().join("codex-home").abs(); + std::fs::create_dir_all(&config.codex_home).expect("create codex home"); + + let root_thread_id = ThreadId::new(); + let descendant_thread_ids = vec![ThreadId::new(), ThreadId::new()]; + let agent_graph_store = Arc::new(FakeAgentGraphStore { + root_thread_id, + descendant_thread_ids: descendant_thread_ids.clone(), + }); + let auth_manager = + AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let manager = ThreadManager::new( + &config, + auth_manager, + SessionSource::Exec, + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + empty_extension_registry(), + Arc::new(crate::test_support::EmptyUserInstructionsProvider), + /*analytics_events_client*/ None, + thread_store_from_config(&config, /*state_db*/ None), + Some(agent_graph_store), + TEST_INSTALLATION_ID.to_string(), + /*attestation_provider*/ None, + /*external_time_provider*/ None, + ); + + let mut expected_thread_ids = vec![root_thread_id]; + expected_thread_ids.extend(descendant_thread_ids); + assert_eq!( + manager + .list_agent_subtree_thread_ids(root_thread_id) + .await + .expect("subtree should load from injected graph store"), + expected_thread_ids + ); +} + #[tokio::test] async fn rollout_path_resume_and_fork_read_history_through_thread_store() { let temp_dir = tempdir().expect("tempdir"); @@ -976,7 +1060,7 @@ async fn rollout_path_resume_and_fork_read_history_through_thread_store() { Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store.clone(), - state_db, + local_agent_graph_store_from_state_db(state_db.as_ref()), TEST_INSTALLATION_ID.to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, @@ -1081,7 +1165,7 @@ async fn new_uses_active_provider_for_model_refresh() { Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store_from_config(&config, /*state_db*/ None), - /*state_db*/ None, + /*agent_graph_store*/ None, TEST_INSTALLATION_ID.to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, @@ -1303,7 +1387,7 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store_from_config(&config, state_db.clone()), - state_db.clone(), + local_agent_graph_store_from_state_db(state_db.as_ref()), TEST_INSTALLATION_ID.to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, @@ -1412,7 +1496,7 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() { Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store_from_config(&config, state_db.clone()), - state_db.clone(), + local_agent_graph_store_from_state_db(state_db.as_ref()), TEST_INSTALLATION_ID.to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, @@ -1511,7 +1595,7 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_ Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store_from_config(&config, state_db.clone()), - state_db.clone(), + local_agent_graph_store_from_state_db(state_db.as_ref()), TEST_INSTALLATION_ID.to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index 1faa3b416..ccfb465b7 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -4,6 +4,7 @@ use crate::config::AgentRoleConfig; use crate::config::DEFAULT_AGENT_MAX_DEPTH; use crate::function_tool::FunctionCallError; use crate::init_state_db; +use crate::local_agent_graph_store_from_state_db; use crate::session::step_context::StepContext; use crate::session::tests::make_session_and_context; use crate::session_prefix::format_inter_agent_completion_message; @@ -4267,7 +4268,7 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr Arc::new(crate::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store_from_config(&config, state_db.clone()), - state_db.clone(), + local_agent_graph_store_from_state_db(state_db.as_ref()), "11111111-1111-4111-8111-111111111111".to_string(), /*attestation_provider*/ None, /*external_time_provider*/ None, diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index e40f37e3c..6a09c0f65 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -608,7 +608,7 @@ impl TestCodexBuilder { user_instructions_provider, /*analytics_events_client*/ None, thread_store, - state_db.clone(), + codex_core::local_agent_graph_store_from_state_db(state_db.as_ref()), installation_id, /*attestation_provider*/ None, /*external_time_provider*/ self.external_time_provider.clone(), diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 98303d09b..f934a2434 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -1464,7 +1464,7 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() { Arc::new(codex_core::test_support::EmptyUserInstructionsProvider), /*analytics_events_client*/ None, thread_store_from_config(&config, /*state_db*/ None), - /*state_db*/ None, + /*agent_graph_store*/ None, installation_id, /*attestation_provider*/ None, /*external_time_provider*/ None, diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index 5f36091fe..d211ff0a3 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -75,7 +75,7 @@ impl MessageProcessor { user_instructions_provider, /*analytics_events_client*/ None, codex_core::thread_store_from_config(config.as_ref(), state_db.clone()), - state_db.clone(), + codex_core::local_agent_graph_store_from_state_db(state_db.as_ref()), installation_id, /*attestation_provider*/ None, /*external_time_provider*/ None, diff --git a/codex-rs/thread-manager-sample/src/main.rs b/codex-rs/thread-manager-sample/src/main.rs index 3d8561a17..4237a33fe 100644 --- a/codex-rs/thread-manager-sample/src/main.rs +++ b/codex-rs/thread-manager-sample/src/main.rs @@ -59,6 +59,7 @@ use codex_core_api::empty_extension_registry; use codex_core_api::find_codex_home; use codex_core_api::init_state_db; use codex_core_api::item_event_to_server_notification; +use codex_core_api::local_agent_graph_store_from_state_db; use codex_core_api::resolve_installation_id; use codex_core_api::set_default_originator; use codex_core_api::thread_store_from_config; @@ -133,7 +134,7 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { user_instructions_provider, /*analytics_events_client*/ None, Arc::clone(&thread_store), - state_db, + local_agent_graph_store_from_state_db(state_db.as_ref()), installation_id, /*attestation_provider*/ None, /*external_time_provider*/ None,