[codex] Inject agent graph store into ThreadManager (#29736)

Pick up the AgentGraphStore migration.

- Inject an explicit optional agent graph store into `ThreadManager` 
- Move all calls to spawn, close, recursive resume, and
subtree/archive/delete/feedback traversal through it
- Keep using  `LocalAgentGraphStore` when SQLite is available

This required some changes to the interface to deal with futures:

- The interface now matches `ThreadStore`'s object-safe pattern by
returning a boxed `AgentGraphStoreFuture` directly, allowing
`ThreadManager` to hold `Arc<dyn AgentGraphStore>`

*Slight behavior change!* Unfiltered subtree enumeration now performs a
single all-status breadth-first traversal, so a closed grandchild
beneath an open edge is included; the previous Open-then-Closed
traversals could not cross mixed-status paths and silently omitted it.
This commit is contained in:
Tom
2026-06-24 13:24:10 -07:00
committed by GitHub
Unverified
parent 989f55defa
commit ece1dfece0
26 changed files with 317 additions and 199 deletions
+1
View File
@@ -2615,6 +2615,7 @@ dependencies = [
"bm25",
"chrono",
"clap",
"codex-agent-graph-store",
"codex-analytics",
"codex-api",
"codex-app-server-protocol",
-1
View File
@@ -504,7 +504,6 @@ unwrap_used = "deny"
# silence the false positive here instead of deleting a real dependency.
[workspace.metadata.cargo-shear]
ignored = [
"codex-agent-graph-store",
"icu_provider",
"openssl-sys",
"codex-v8-poc",
+1 -1
View File
@@ -22,4 +22,4 @@ thiserror = { workspace = true }
pretty_assertions = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] }
+1
View File
@@ -9,4 +9,5 @@ pub use error::AgentGraphStoreError;
pub use error::AgentGraphStoreResult;
pub use local::LocalAgentGraphStore;
pub use store::AgentGraphStore;
pub use store::AgentGraphStoreFuture;
pub use types::ThreadSpawnEdgeStatus;
+58 -40
View File
@@ -4,7 +4,7 @@ use std::sync::Arc;
use crate::AgentGraphStore;
use crate::AgentGraphStoreError;
use crate::AgentGraphStoreResult;
use crate::AgentGraphStoreFuture;
use crate::ThreadSpawnEdgeStatus;
/// SQLite-backed implementation of [`AgentGraphStore`] using an existing state runtime.
@@ -29,65 +29,83 @@ impl LocalAgentGraphStore {
}
impl AgentGraphStore for LocalAgentGraphStore {
async fn upsert_thread_spawn_edge(
fn upsert_thread_spawn_edge(
&self,
parent_thread_id: ThreadId,
child_thread_id: ThreadId,
status: ThreadSpawnEdgeStatus,
) -> AgentGraphStoreResult<()> {
self.state_db
.upsert_thread_spawn_edge(parent_thread_id, child_thread_id, to_state_status(status))
.await
.map_err(internal_error)
) -> AgentGraphStoreFuture<'_, ()> {
Box::pin(async move {
self.state_db
.upsert_thread_spawn_edge(
parent_thread_id,
child_thread_id,
to_state_status(status),
)
.await
.map_err(internal_error)
})
}
async fn set_thread_spawn_edge_status(
fn set_thread_spawn_edge_status(
&self,
child_thread_id: ThreadId,
status: ThreadSpawnEdgeStatus,
) -> AgentGraphStoreResult<()> {
self.state_db
.set_thread_spawn_edge_status(child_thread_id, to_state_status(status))
.await
.map_err(internal_error)
) -> AgentGraphStoreFuture<'_, ()> {
Box::pin(async move {
self.state_db
.set_thread_spawn_edge_status(child_thread_id, to_state_status(status))
.await
.map_err(internal_error)
})
}
async fn list_thread_spawn_children(
fn list_thread_spawn_children(
&self,
parent_thread_id: ThreadId,
status_filter: Option<ThreadSpawnEdgeStatus>,
) -> AgentGraphStoreResult<Vec<ThreadId>> {
if let Some(status) = status_filter {
return self
.state_db
.list_thread_spawn_children_with_status(parent_thread_id, to_state_status(status))
.await
.map_err(internal_error);
}
) -> AgentGraphStoreFuture<'_, Vec<ThreadId>> {
Box::pin(async move {
if let Some(status) = status_filter {
return self
.state_db
.list_thread_spawn_children_with_status(
parent_thread_id,
to_state_status(status),
)
.await
.map_err(internal_error);
}
self.state_db
.list_thread_spawn_children(parent_thread_id)
.await
.map_err(internal_error)
self.state_db
.list_thread_spawn_children(parent_thread_id)
.await
.map_err(internal_error)
})
}
async fn list_thread_spawn_descendants(
fn list_thread_spawn_descendants(
&self,
root_thread_id: ThreadId,
status_filter: Option<ThreadSpawnEdgeStatus>,
) -> AgentGraphStoreResult<Vec<ThreadId>> {
match status_filter {
Some(status) => self
.state_db
.list_thread_spawn_descendants_with_status(root_thread_id, to_state_status(status))
.await
.map_err(internal_error),
None => self
.state_db
.list_thread_spawn_descendants(root_thread_id)
.await
.map_err(internal_error),
}
) -> AgentGraphStoreFuture<'_, Vec<ThreadId>> {
Box::pin(async move {
match status_filter {
Some(status) => self
.state_db
.list_thread_spawn_descendants_with_status(
root_thread_id,
to_state_status(status),
)
.await
.map_err(internal_error),
None => self
.state_db
.list_thread_spawn_descendants(root_thread_id)
.await
.map_err(internal_error),
}
})
}
}
+11 -4
View File
@@ -1,8 +1,15 @@
use std::future::Future;
use std::pin::Pin;
use codex_protocol::ThreadId;
use crate::AgentGraphStoreResult;
use crate::ThreadSpawnEdgeStatus;
/// Future returned by [`AgentGraphStore`] operations.
pub type AgentGraphStoreFuture<'a, T> =
Pin<Box<dyn Future<Output = AgentGraphStoreResult<T>> + Send + 'a>>;
/// Storage-neutral boundary for persisted thread-spawn parent/child topology.
///
/// Implementations are expected to return stable ordering for list methods so callers can merge
@@ -17,7 +24,7 @@ pub trait AgentGraphStore: Send + Sync {
parent_thread_id: ThreadId,
child_thread_id: ThreadId,
status: ThreadSpawnEdgeStatus,
) -> impl std::future::Future<Output = AgentGraphStoreResult<()>> + Send;
) -> AgentGraphStoreFuture<'_, ()>;
/// Update the persisted lifecycle status of a spawned thread's incoming edge.
///
@@ -26,7 +33,7 @@ pub trait AgentGraphStore: Send + Sync {
&self,
child_thread_id: ThreadId,
status: ThreadSpawnEdgeStatus,
) -> impl std::future::Future<Output = AgentGraphStoreResult<()>> + Send;
) -> AgentGraphStoreFuture<'_, ()>;
/// List direct spawned children of a parent thread.
///
@@ -37,7 +44,7 @@ pub trait AgentGraphStore: Send + Sync {
&self,
parent_thread_id: ThreadId,
status_filter: Option<ThreadSpawnEdgeStatus>,
) -> impl std::future::Future<Output = AgentGraphStoreResult<Vec<ThreadId>>> + Send;
) -> AgentGraphStoreFuture<'_, Vec<ThreadId>>;
/// List spawned descendants breadth-first by depth, then by thread id.
///
@@ -49,5 +56,5 @@ pub trait AgentGraphStore: Send + Sync {
&self,
root_thread_id: ThreadId,
status_filter: Option<ThreadSpawnEdgeStatus>,
) -> impl std::future::Future<Output = AgentGraphStoreResult<Vec<ThreadId>>> + Send;
) -> AgentGraphStoreFuture<'_, Vec<ThreadId>>;
}
+1 -1
View File
@@ -247,7 +247,7 @@ mod tests {
)),
/*analytics_events_client*/ None,
Arc::clone(&thread_store),
Some(state_db.clone()),
codex_core::local_agent_graph_store_from_state_db(Some(&state_db)),
"11111111-1111-4111-8111-111111111111".to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
+1 -1
View File
@@ -373,7 +373,7 @@ impl MessageProcessor {
)),
Some(analytics_events_client.clone()),
Arc::clone(&thread_store),
state_db.clone(),
codex_core::local_agent_graph_store_from_state_db(state_db.as_ref()),
installation_id,
Some(app_server_attestation_provider(
outgoing.clone(),
@@ -102,27 +102,7 @@ impl FeedbackRequestProcessor {
warn!(
"failed to list feedback subtree for thread_id={conversation_id}: {err}"
);
let mut thread_ids = vec![conversation_id];
if let Some(state_db_ctx) = state_db_ctx.as_ref() {
for status in [
codex_state::DirectionalThreadSpawnEdgeStatus::Open,
codex_state::DirectionalThreadSpawnEdgeStatus::Closed,
] {
match state_db_ctx
.list_thread_spawn_descendants_with_status(
conversation_id,
status,
)
.await
{
Ok(descendant_ids) => thread_ids.extend(descendant_ids),
Err(err) => warn!(
"failed to list persisted feedback subtree for thread_id={conversation_id}: {err}"
),
}
}
}
thread_ids
vec![conversation_id]
}
},
None => Vec::new(),
@@ -1,6 +1,5 @@
//! `thread/delete` request handling.
use super::thread_processor::core_thread_write_error;
use super::thread_processor::unsupported_thread_store_operation;
use super::*;
@@ -37,23 +36,7 @@ impl ThreadRequestProcessor {
let thread_id = ThreadId::from_string(&params.thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;
let mut thread_ids = self.state_db_spawn_subtree_thread_ids(thread_id).await?;
let mut seen = thread_ids.iter().copied().collect::<HashSet<_>>();
match self
.thread_manager
.list_agent_subtree_thread_ids(thread_id)
.await
{
Ok(live_thread_ids) => {
for live_thread_id in live_thread_ids {
if seen.insert(live_thread_id) {
thread_ids.push(live_thread_id);
}
}
}
Err(err) => return Err(core_thread_write_error("delete thread", err)),
}
let thread_ids = self.state_db_spawn_subtree_thread_ids(thread_id).await?;
self.validate_root_thread_delete(thread_id, thread_ids.len() > 1)
.await?;
@@ -1431,25 +1431,14 @@ impl ThreadRequestProcessor {
&self,
thread_id: ThreadId,
) -> Result<Vec<ThreadId>, JSONRPCErrorError> {
let mut thread_ids = vec![thread_id];
let Some(state_db_ctx) = self.state_db.as_ref() else {
return Ok(thread_ids);
};
let mut seen = HashSet::from([thread_id]);
let descendants = state_db_ctx
.list_thread_spawn_descendants(thread_id)
self.thread_manager
.list_agent_subtree_thread_ids(thread_id)
.await
.map_err(|err| {
internal_error(format!(
"failed to list spawned descendants for thread id {thread_id}: {err}"
))
})?;
for descendant_id in descendants {
if seen.insert(descendant_id) {
thread_ids.push(descendant_id);
}
}
Ok(thread_ids)
})
}
async fn thread_increment_elicitation_inner(
+1
View File
@@ -44,6 +44,7 @@ pub use codex_core::config::TerminalResizeReflowConfig;
pub use codex_core::config::ThreadStoreConfig;
pub use codex_core::config::find_codex_home;
pub use codex_core::init_state_db;
pub use codex_core::local_agent_graph_store_from_state_db;
pub use codex_core::resolve_installation_id;
pub use codex_core::skills::SkillsService;
pub use codex_core::thread_store_from_config;
+1
View File
@@ -24,6 +24,7 @@ bm25 = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["derive"] }
codex-analytics = { workspace = true }
codex-agent-graph-store = { workspace = true }
codex-api = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-apply-patch = { workspace = true }
+10 -5
View File
@@ -36,7 +36,6 @@ use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::ThreadSource;
use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_protocol::user_input::UserInput;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_thread_store::ReadThreadParams;
use serde::Serialize;
use std::collections::HashMap;
@@ -639,7 +638,7 @@ impl AgentControl {
async fn persist_thread_spawn_edge_for_source(
&self,
thread: &crate::CodexThread,
child_thread: &crate::CodexThread,
child_thread_id: ThreadId,
session_source: Option<&SessionSource>,
) {
@@ -647,14 +646,20 @@ impl AgentControl {
else {
return;
};
let Some(state_db_ctx) = thread.state_db() else {
if child_thread.config_snapshot().await.ephemeral {
return;
}
let Ok(state) = self.upgrade() else {
return;
};
if let Err(err) = state_db_ctx
let Some(agent_graph_store) = state.agent_graph_store() else {
return;
};
if let Err(err) = agent_graph_store
.upsert_thread_spawn_edge(
parent_thread_id,
child_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
codex_agent_graph_store::ThreadSpawnEdgeStatus::Open,
)
.await
{
+7 -6
View File
@@ -31,11 +31,12 @@ impl AgentControl {
let known_agent = self.state.agent_metadata_for_thread(agent_id).is_some();
match state.get_thread(agent_id).await {
Ok(thread) => {
if let Some(state_db_ctx) = thread.state_db()
&& let Err(err) = state_db_ctx
if !thread.config_snapshot().await.ephemeral
&& let Some(agent_graph_store) = state.agent_graph_store()
&& let Err(err) = agent_graph_store
.set_thread_spawn_edge_status(
agent_id,
DirectionalThreadSpawnEdgeStatus::Closed,
codex_agent_graph_store::ThreadSpawnEdgeStatus::Closed,
)
.await
{
@@ -43,11 +44,11 @@ impl AgentControl {
}
}
Err(CodexErr::ThreadNotFound(_)) if known_agent => {
if let Some(state_db_ctx) = state.state_db()
&& let Err(err) = state_db_ctx
if let Some(agent_graph_store) = state.agent_graph_store()
&& let Err(err) = agent_graph_store
.set_thread_spawn_edge_status(
agent_id,
DirectionalThreadSpawnEdgeStatus::Closed,
codex_agent_graph_store::ThreadSpawnEdgeStatus::Closed,
)
.await
{
+21 -28
View File
@@ -545,19 +545,16 @@ impl AgentControl {
{
return Ok(resumed_thread_id);
}
let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else {
return Ok(resumed_thread_id);
};
let Some(state_db_ctx) = resumed_thread.state_db() else {
let Some(agent_graph_store) = state.agent_graph_store() else {
return Ok(resumed_thread_id);
};
let mut resume_queue = VecDeque::from([(thread_id, root_depth)]);
while let Some((parent_thread_id, parent_depth)) = resume_queue.pop_front() {
let child_ids = match state_db_ctx
.list_thread_spawn_children_with_status(
let child_ids = match agent_graph_store
.list_thread_spawn_children(
parent_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
Some(codex_agent_graph_store::ThreadSpawnEdgeStatus::Open),
)
.await
{
@@ -613,7 +610,6 @@ impl AgentControl {
session_source: SessionSource,
) -> CodexResult<(ThreadId, MultiAgentVersion)> {
let state = self.upgrade()?;
let state_db_ctx = state.state_db();
let stored_thread = state
.read_stored_thread(ReadThreadParams {
thread_id,
@@ -621,6 +617,14 @@ impl AgentControl {
include_history: true,
})
.await?;
let resumed_agent_path = stored_thread
.agent_path
.as_deref()
.map(AgentPath::try_from)
.transpose()
.map_err(|err| CodexErr::InvalidRequest(format!("invalid stored agent path: {err}")))?;
let resumed_agent_nickname = stored_thread.agent_nickname.clone();
let resumed_agent_role = stored_thread.agent_role.clone();
let history = stored_thread
.history
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))?
@@ -649,26 +653,15 @@ impl AgentControl {
agent_path,
agent_role: _,
agent_nickname: _,
}) => {
let (resumed_agent_nickname, resumed_agent_role) =
if let Some(state_db_ctx) = state_db_ctx.as_ref() {
match state_db_ctx.get_thread(thread_id).await {
Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role),
Ok(None) | Err(_) => (None, None),
}
} else {
(None, None)
};
self.prepare_thread_spawn(
&mut reservation,
&config,
parent_thread_id,
depth,
agent_path,
resumed_agent_role,
resumed_agent_nickname,
)?
}
}) => self.prepare_thread_spawn(
&mut reservation,
&config,
parent_thread_id,
depth,
agent_path.or(resumed_agent_path),
resumed_agent_role,
resumed_agent_nickname,
)?,
other => (other, AgentMetadata::default()),
};
let notification_source = session_source.clone();
+66 -20
View File
@@ -12,7 +12,9 @@ use crate::init_state_db;
use crate::thread_manager::StartThreadOptions;
use assert_matches::assert_matches;
use codex_extension_api::ExtensionDataInit;
use codex_extension_api::empty_extension_registry;
use codex_features::Feature;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_protocol::AgentPath;
use codex_protocol::config_types::ModeKind;
@@ -33,6 +35,7 @@ use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use codex_thread_store::ArchiveThreadParams;
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::LocalThreadStore;
use codex_thread_store::LocalThreadStoreConfig;
use codex_thread_store::ThreadStore;
@@ -849,6 +852,42 @@ async fn spawn_agent_creates_thread_and_sends_prompt() {
assert_eq!(captured, Some(expected));
}
#[tokio::test]
async fn ephemeral_spawn_does_not_persist_agent_graph_edge() {
let (home, mut config) = test_config().await;
config.ephemeral = true;
let harness = AgentControlHarness::new_with_config(home, config).await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("spawned"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
})),
)
.await
.expect("ephemeral agent spawn should succeed");
let persisted_children = harness
.state_db
.as_ref()
.expect("manager should retain state db")
.list_thread_spawn_children(parent_thread_id)
.await
.expect("persisted child list should load");
assert_eq!(persisted_children, Vec::<ThreadId>::new());
assert!(
harness.manager.get_thread(child_thread_id).await.is_ok(),
"ephemeral child should remain live"
);
}
#[tokio::test]
async fn spawn_thread_subagent_uses_supplied_initial_multi_agent_mode_without_history() {
let harness = AgentControlHarness::new().await;
@@ -2354,24 +2393,27 @@ async fn spawn_thread_subagent_uses_role_specific_nickname_candidates() {
#[tokio::test]
async fn resume_thread_subagent_restores_stored_metadata_and_effective_multi_agent_mode() {
let (home, mut config) = test_config().await;
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow sqlite");
let state_db = init_state_db(&config).await;
let manager = ThreadManager::with_models_provider_home_and_state_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
state_db.clone(),
let (home, config) = test_config().await;
let thread_store = Arc::new(InMemoryThreadStore::default());
let manager = ThreadManager::new(
&config,
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
empty_extension_registry(),
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store.clone(),
/*agent_graph_store*/ None,
uuid::Uuid::new_v4().to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
);
let control = manager.agent_control();
let harness = AgentControlHarness {
_home: home,
config,
state_db,
state_db: None,
manager,
control,
};
@@ -2457,14 +2499,18 @@ async fn resume_thread_subagent_restores_stored_metadata_and_effective_multi_age
.session_source
.get_nickname()
.expect("spawned sub-agent should have a nickname");
let state_db = child_thread
.state_db()
.expect("sqlite state db should be available for nickname resume test");
timeout(Duration::from_secs(5), async {
loop {
if let Ok(Some(metadata)) = state_db.get_thread(child_thread_id).await
&& metadata.agent_nickname.is_some()
&& metadata.agent_role.as_deref() == Some("explorer")
if let Ok(stored_thread) = thread_store
.read_thread(ReadThreadParams {
thread_id: child_thread_id,
include_archived: true,
include_history: false,
})
.await
&& stored_thread.agent_nickname.is_some()
&& stored_thread.agent_role.as_deref() == Some("explorer")
&& stored_thread.agent_path.as_deref() == Some(agent_path.as_str())
{
break;
}
@@ -2488,7 +2534,7 @@ async fn resume_thread_subagent_restores_stored_metadata_and_effective_multi_age
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: Some(agent_path.clone()),
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
+1
View File
@@ -118,6 +118,7 @@ pub use thread_manager::StartThreadOptions;
pub use thread_manager::ThreadManager;
pub use thread_manager::ThreadShutdownReport;
pub use thread_manager::build_models_manager;
pub use thread_manager::local_agent_graph_store_from_state_db;
pub use thread_manager::thread_store_from_config;
pub use web_search::web_search_action_detail;
pub use web_search::web_search_detail;
+1 -1
View File
@@ -58,7 +58,7 @@ pub async fn build_prompt_input(
user_instructions_provider,
/*analytics_events_client*/ None,
thread_store,
state_db.clone(),
crate::local_agent_graph_store_from_state_db(state_db.as_ref()),
installation_id,
/*attestation_provider*/ None,
/*external_time_provider*/ None,
+28 -22
View File
@@ -16,6 +16,8 @@ use crate::session::INITIAL_SUBMIT_ID;
use crate::session::resolve_multi_agent_version;
use crate::tasks::InterruptedTurnHistoryMarker;
use crate::tasks::interrupted_turn_history_marker;
use codex_agent_graph_store::AgentGraphStore;
use codex_agent_graph_store::LocalAgentGraphStore;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::TurnStatus;
@@ -58,7 +60,6 @@ use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_protocol::protocol::W3cTraceContext;
use codex_rollout::state_db::StateDbHandle;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::LocalThreadStore;
use codex_thread_store::LocalThreadStoreConfig;
@@ -241,12 +242,12 @@ pub(crate) struct ThreadManagerState {
extensions: Arc<ExtensionRegistry<Config>>,
user_instructions_provider: Arc<dyn UserInstructionsProvider>,
thread_store: Arc<dyn ThreadStore>,
agent_graph_store: Option<Arc<dyn AgentGraphStore>>,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
external_time_provider: Option<Arc<dyn TimeProvider>>,
session_source: SessionSource,
installation_id: String,
analytics_events_client: Option<AnalyticsEventsClient>,
state_db: Option<StateDbHandle>,
// Captures submitted ops for testing purpose when test mode is enabled.
ops_log: Option<SharedCapturedOps>,
}
@@ -283,6 +284,15 @@ pub fn thread_store_from_config(
}
}
/// Construct the default SQLite-backed agent graph store when local state is available.
pub fn local_agent_graph_store_from_state_db(
state_db: Option<&StateDbHandle>,
) -> Option<Arc<dyn AgentGraphStore>> {
state_db.map(|state_db| {
Arc::new(LocalAgentGraphStore::new(Arc::clone(state_db))) as Arc<dyn AgentGraphStore>
})
}
impl ThreadManager {
#[allow(clippy::too_many_arguments)]
pub fn new(
@@ -294,7 +304,7 @@ impl ThreadManager {
user_instructions_provider: Arc<dyn UserInstructionsProvider>,
analytics_events_client: Option<AnalyticsEventsClient>,
thread_store: Arc<dyn ThreadStore>,
state_db: Option<StateDbHandle>,
agent_graph_store: Option<Arc<dyn AgentGraphStore>>,
installation_id: String,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
external_time_provider: Option<Arc<dyn TimeProvider>>,
@@ -328,13 +338,13 @@ impl ThreadManager {
extensions,
user_instructions_provider,
thread_store,
agent_graph_store,
attestation_provider,
external_time_provider,
auth_manager,
session_source,
installation_id,
analytics_events_client,
state_db,
ops_log: should_use_test_thread_manager_behavior()
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
}),
@@ -419,6 +429,7 @@ impl ThreadManager {
},
state_db.clone(),
));
let agent_graph_store = local_agent_graph_store_from_state_db(state_db.as_ref());
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
@@ -434,13 +445,13 @@ impl ThreadManager {
crate::test_support::EmptyUserInstructionsProvider,
),
thread_store,
agent_graph_store,
attestation_provider: None,
external_time_provider: None,
auth_manager,
session_source: SessionSource::Exec,
installation_id,
analytics_events_client: None,
state_db,
ops_log: should_use_test_thread_manager_behavior()
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
}),
@@ -579,21 +590,16 @@ impl ThreadManager {
subtree_thread_ids.push(thread_id);
seen_thread_ids.insert(thread_id);
if let Some(state_db_ctx) = self.state.state_db() {
for status in [
DirectionalThreadSpawnEdgeStatus::Open,
DirectionalThreadSpawnEdgeStatus::Closed,
] {
for descendant_id in state_db_ctx
.list_thread_spawn_descendants_with_status(thread_id, status)
.await
.map_err(|err| {
CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}"))
})?
{
if seen_thread_ids.insert(descendant_id) {
subtree_thread_ids.push(descendant_id);
}
if let Some(agent_graph_store) = self.state.agent_graph_store() {
for descendant_id in agent_graph_store
.list_thread_spawn_descendants(thread_id, /*status_filter*/ None)
.await
.map_err(|err| {
CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}"))
})?
{
if seen_thread_ids.insert(descendant_id) {
subtree_thread_ids.push(descendant_id);
}
}
}
@@ -1062,8 +1068,8 @@ impl ThreadManager {
}
impl ThreadManagerState {
pub(crate) fn state_db(&self) -> Option<StateDbHandle> {
self.state_db.clone()
pub(crate) fn agent_graph_store(&self) -> Option<Arc<dyn AgentGraphStore>> {
self.agent_graph_store.clone()
}
pub(crate) async fn list_thread_ids(&self) -> Vec<ThreadId> {
+95 -11
View File
@@ -35,6 +35,49 @@ use wiremock::MockServer;
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
struct FakeAgentGraphStore {
root_thread_id: ThreadId,
descendant_thread_ids: Vec<ThreadId>,
}
impl codex_agent_graph_store::AgentGraphStore for FakeAgentGraphStore {
fn upsert_thread_spawn_edge(
&self,
_parent_thread_id: ThreadId,
_child_thread_id: ThreadId,
_status: codex_agent_graph_store::ThreadSpawnEdgeStatus,
) -> codex_agent_graph_store::AgentGraphStoreFuture<'_, ()> {
Box::pin(async { panic!("unexpected graph upsert") })
}
fn set_thread_spawn_edge_status(
&self,
_child_thread_id: ThreadId,
_status: codex_agent_graph_store::ThreadSpawnEdgeStatus,
) -> codex_agent_graph_store::AgentGraphStoreFuture<'_, ()> {
Box::pin(async { panic!("unexpected graph status update") })
}
fn list_thread_spawn_children(
&self,
_parent_thread_id: ThreadId,
_status_filter: Option<codex_agent_graph_store::ThreadSpawnEdgeStatus>,
) -> codex_agent_graph_store::AgentGraphStoreFuture<'_, Vec<ThreadId>> {
Box::pin(async { panic!("unexpected direct-child listing") })
}
fn list_thread_spawn_descendants(
&self,
root_thread_id: ThreadId,
status_filter: Option<codex_agent_graph_store::ThreadSpawnEdgeStatus>,
) -> codex_agent_graph_store::AgentGraphStoreFuture<'_, Vec<ThreadId>> {
assert_eq!(root_thread_id, self.root_thread_id);
assert_eq!(status_filter, None);
let descendant_thread_ids = self.descendant_thread_ids.clone();
Box::pin(async move { Ok(descendant_thread_ids) })
}
}
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
@@ -477,7 +520,7 @@ async fn start_thread_seeds_extension_data_for_mcp_and_lifecycle_contributors()
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
/*state_db*/ None,
/*agent_graph_store*/ None,
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -588,7 +631,7 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
/*state_db*/ None,
/*agent_graph_store*/ None,
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -713,7 +756,7 @@ async fn explicit_installation_id_skips_codex_home_file() {
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store,
state_db.clone(),
local_agent_graph_store_from_state_db(state_db.as_ref()),
installation_id.clone(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -754,7 +797,7 @@ async fn resume_active_thread_from_rollout_returns_running_thread() {
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
/*state_db*/ None,
/*agent_graph_store*/ None,
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -814,7 +857,7 @@ async fn resume_stopped_thread_from_rollout_spawns_new_thread() {
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
/*state_db*/ None,
/*agent_graph_store*/ None,
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -881,7 +924,7 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() {
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store,
state_db.clone(),
local_agent_graph_store_from_state_db(state_db.as_ref()),
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -948,6 +991,47 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() {
.expect("shutdown resumed thread");
}
#[tokio::test]
async fn subtree_listing_uses_injected_graph_store_without_state_db() {
let temp_dir = tempdir().expect("tempdir");
let mut config = test_config().await;
config.codex_home = temp_dir.path().join("codex-home").abs();
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
let root_thread_id = ThreadId::new();
let descendant_thread_ids = vec![ThreadId::new(), ThreadId::new()];
let agent_graph_store = Arc::new(FakeAgentGraphStore {
root_thread_id,
descendant_thread_ids: descendant_thread_ids.clone(),
});
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let manager = ThreadManager::new(
&config,
auth_manager,
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
empty_extension_registry(),
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
Some(agent_graph_store),
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
);
let mut expected_thread_ids = vec![root_thread_id];
expected_thread_ids.extend(descendant_thread_ids);
assert_eq!(
manager
.list_agent_subtree_thread_ids(root_thread_id)
.await
.expect("subtree should load from injected graph store"),
expected_thread_ids
);
}
#[tokio::test]
async fn rollout_path_resume_and_fork_read_history_through_thread_store() {
let temp_dir = tempdir().expect("tempdir");
@@ -976,7 +1060,7 @@ async fn rollout_path_resume_and_fork_read_history_through_thread_store() {
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store.clone(),
state_db,
local_agent_graph_store_from_state_db(state_db.as_ref()),
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -1081,7 +1165,7 @@ async fn new_uses_active_provider_for_model_refresh() {
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
/*state_db*/ None,
/*agent_graph_store*/ None,
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -1303,7 +1387,7 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, state_db.clone()),
state_db.clone(),
local_agent_graph_store_from_state_db(state_db.as_ref()),
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -1412,7 +1496,7 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, state_db.clone()),
state_db.clone(),
local_agent_graph_store_from_state_db(state_db.as_ref()),
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -1511,7 +1595,7 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, state_db.clone()),
state_db.clone(),
local_agent_graph_store_from_state_db(state_db.as_ref()),
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
@@ -4,6 +4,7 @@ use crate::config::AgentRoleConfig;
use crate::config::DEFAULT_AGENT_MAX_DEPTH;
use crate::function_tool::FunctionCallError;
use crate::init_state_db;
use crate::local_agent_graph_store_from_state_db;
use crate::session::step_context::StepContext;
use crate::session::tests::make_session_and_context;
use crate::session_prefix::format_inter_agent_completion_message;
@@ -4267,7 +4268,7 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr
Arc::new(crate::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, state_db.clone()),
state_db.clone(),
local_agent_graph_store_from_state_db(state_db.as_ref()),
"11111111-1111-4111-8111-111111111111".to_string(),
/*attestation_provider*/ None,
/*external_time_provider*/ None,
+1 -1
View File
@@ -608,7 +608,7 @@ impl TestCodexBuilder {
user_instructions_provider,
/*analytics_events_client*/ None,
thread_store,
state_db.clone(),
codex_core::local_agent_graph_store_from_state_db(state_db.as_ref()),
installation_id,
/*attestation_provider*/ None,
/*external_time_provider*/ self.external_time_provider.clone(),
+1 -1
View File
@@ -1464,7 +1464,7 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
Arc::new(codex_core::test_support::EmptyUserInstructionsProvider),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
/*state_db*/ None,
/*agent_graph_store*/ None,
installation_id,
/*attestation_provider*/ None,
/*external_time_provider*/ None,
+1 -1
View File
@@ -75,7 +75,7 @@ impl MessageProcessor {
user_instructions_provider,
/*analytics_events_client*/ None,
codex_core::thread_store_from_config(config.as_ref(), state_db.clone()),
state_db.clone(),
codex_core::local_agent_graph_store_from_state_db(state_db.as_ref()),
installation_id,
/*attestation_provider*/ None,
/*external_time_provider*/ None,
+2 -1
View File
@@ -59,6 +59,7 @@ use codex_core_api::empty_extension_registry;
use codex_core_api::find_codex_home;
use codex_core_api::init_state_db;
use codex_core_api::item_event_to_server_notification;
use codex_core_api::local_agent_graph_store_from_state_db;
use codex_core_api::resolve_installation_id;
use codex_core_api::set_default_originator;
use codex_core_api::thread_store_from_config;
@@ -133,7 +134,7 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
user_instructions_provider,
/*analytics_events_client*/ None,
Arc::clone(&thread_store),
state_db,
local_agent_graph_store_from_state_db(state_db.as_ref()),
installation_id,
/*attestation_provider*/ None,
/*external_time_provider*/ None,