From 6d15bb3d17f2b2a7d2a11ddd0689dc9244bf744c Mon Sep 17 00:00:00 2001 From: jif Date: Mon, 22 Jun 2026 08:36:08 +0100 Subject: [PATCH] Persist session IDs across thread resume (#29327) ## Summary A cold-resumed subagent kept its durable thread ID but could receive a new session ID, splitting one agent tree across multiple sessions after a restart. Persist the root session ID in every rollout `SessionMeta`, carry it through thread creation, and restore it before initializing the resumed `Session` and `AgentControl`. ## Behavior For a nested agent tree: ```text root session R parent thread P child thread C ``` The child rollout stores: ```text session_id: R parent_thread_id: P id: C ``` After a cold resume, the child still belongs to root session `R` while its immediate parent remains `P`. The integration coverage uses distinct values for all three IDs so it catches restoring the session from `parent_thread_id`. ## Legacy rollouts Previous rollouts have `id` but no `session_id`. `SessionMetaLine` deserialization treats a missing `session_id` as `id`, keeping those files readable, listable, and resumable. When a legacy subagent is resumed through its root, that synthesized child ID no longer overrides the inherited root-scoped `AgentControl`. New rollouts always persist the explicit root session ID. --- .../external_agent_session_import.rs | 1 + .../thread_processor_tests.rs | 3 ++ .../thread_summary_tests.rs | 1 + codex-rs/app-server/tests/common/rollout.rs | 10 ++++- .../tests/suite/conversation_summary.rs | 1 + .../tests/suite/v2/client_metadata.rs | 10 ++++- .../tests/suite/v2/remote_thread_store.rs | 1 + .../app-server/tests/suite/v2/thread_list.rs | 1 + .../app-server/tests/suite/v2/thread_read.rs | 1 + .../tests/suite/v2/thread_resume.rs | 2 + .../tests/suite/v2/thread_unarchive.rs | 1 + codex-rs/cli/src/doctor/thread_inventory.rs | 4 +- .../core/src/personality_migration_tests.rs | 1 + codex-rs/core/src/session/session.rs | 39 +++++++++++++----- codex-rs/core/src/session/tests.rs | 17 ++++++-- codex-rs/core/tests/suite/client.rs | 9 ++++- .../core/tests/suite/personality_migration.rs | 2 + codex-rs/core/tests/suite/review.rs | 1 + .../core/tests/suite/rollout_list_find.rs | 1 + codex-rs/core/tests/suite/sqlite_state.rs | 1 + codex-rs/protocol/src/protocol.rs | 40 ++++++++++++++++++- codex-rs/rollout/src/compression_tests.rs | 1 + codex-rs/rollout/src/metadata_tests.rs | 3 ++ codex-rs/rollout/src/recorder.rs | 22 +++++++--- codex-rs/rollout/src/recorder_tests.rs | 27 +++++++++---- codex-rs/rollout/src/session_index_tests.rs | 1 + codex-rs/rollout/src/state_db_tests.rs | 1 + codex-rs/rollout/src/tests.rs | 8 +++- codex-rs/state/src/extract.rs | 2 + codex-rs/state/src/runtime/threads.rs | 2 + codex-rs/thread-store/src/in_memory.rs | 2 + .../thread-store/src/local/create_thread.rs | 1 + codex-rs/thread-store/src/local/mod.rs | 1 + .../thread-store/src/local/read_thread.rs | 3 ++ .../thread-store/src/local/test_support.rs | 1 + .../thread-store/src/thread_metadata_sync.rs | 1 + codex-rs/thread-store/src/types.rs | 3 ++ codex-rs/tui/src/lib.rs | 1 + 38 files changed, 193 insertions(+), 34 deletions(-) diff --git a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs index d4abb155e..0e17cb573 100644 --- a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs +++ b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs @@ -200,6 +200,7 @@ impl ExternalAgentSessionImporter { }; let now = Utc::now(); let create_params = CreateThreadParams { + session_id: thread_id.into(), thread_id, extra_config: None, forked_from_id: None, diff --git a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs index 0ac9b7fa2..1f5ae12f4 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs @@ -1004,6 +1004,7 @@ mod thread_processor_behavior_tests { let timestamp = "2025-09-05T16:53:11.850Z".to_string(); let session_meta = SessionMeta { + session_id: conversation_id.into(), id: conversation_id, timestamp: timestamp.clone(), model_provider: None, @@ -1060,6 +1061,7 @@ mod thread_processor_behavior_tests { let timestamp = "2025-09-05T16:53:11.850Z".to_string(); let session_meta = SessionMeta { + session_id: parent_thread_id.into(), id: conversation_id, timestamp: timestamp.clone(), source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn { @@ -1110,6 +1112,7 @@ mod thread_processor_behavior_tests { let timestamp = "2025-09-05T16:53:11.850Z".to_string(); let session_meta = SessionMeta { + session_id: conversation_id.into(), id: conversation_id, forked_from_id: Some(forked_from_id), timestamp: timestamp.clone(), diff --git a/codex-rs/app-server/src/request_processors/thread_summary_tests.rs b/codex-rs/app-server/src/request_processors/thread_summary_tests.rs index f8902e132..8bead5529 100644 --- a/codex-rs/app-server/src/request_processors/thread_summary_tests.rs +++ b/codex-rs/app-server/src/request_processors/thread_summary_tests.rs @@ -13,6 +13,7 @@ fn extract_conversation_summary_prefers_plain_user_messages() -> Result<()> { let head = vec![ json!({ + "session_id": conversation_id.to_string(), "id": conversation_id.to_string(), "timestamp": timestamp, "cwd": "/", diff --git a/codex-rs/app-server/tests/common/rollout.rs b/codex-rs/app-server/tests/common/rollout.rs index 6d446e1c4..7b389e8fc 100644 --- a/codex-rs/app-server/tests/common/rollout.rs +++ b/codex-rs/app-server/tests/common/rollout.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::GitInfo; @@ -127,11 +128,12 @@ pub fn create_fake_rollout_with_source( model_provider, git_info, source, + /*session_id*/ None, /*parent_thread_id*/ None, ) } -/// Create a minimal rollout file with an explicit session source and control parent. +/// Create a minimal rollout file with an explicit root session and control parent. #[allow(clippy::too_many_arguments)] pub fn create_fake_parented_rollout_with_source( codex_home: &Path, @@ -141,6 +143,7 @@ pub fn create_fake_parented_rollout_with_source( model_provider: Option<&str>, git_info: Option, source: SessionSource, + session_id: SessionId, parent_thread_id: ThreadId, ) -> Result { create_fake_rollout_with_source_and_parent_thread_id( @@ -151,6 +154,7 @@ pub fn create_fake_parented_rollout_with_source( model_provider, git_info, source, + Some(session_id), Some(parent_thread_id), ) } @@ -164,11 +168,13 @@ fn create_fake_rollout_with_source_and_parent_thread_id( model_provider: Option<&str>, git_info: Option, source: SessionSource, + session_id: Option, parent_thread_id: Option, ) -> Result { let uuid = Uuid::new_v4(); let uuid_str = uuid.to_string(); let conversation_id = ThreadId::from_string(&uuid_str)?; + let session_id = session_id.unwrap_or_else(|| conversation_id.into()); let file_path = rollout_path(codex_home, filename_ts, &uuid_str); let dir = file_path @@ -178,6 +184,7 @@ fn create_fake_rollout_with_source_and_parent_thread_id( // Build JSONL lines let meta = SessionMeta { + session_id, id: conversation_id, forked_from_id: None, parent_thread_id, @@ -264,6 +271,7 @@ pub fn create_fake_rollout_with_text_elements( // Build JSONL lines let meta = SessionMeta { + session_id: conversation_id.into(), id: conversation_id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/app-server/tests/suite/conversation_summary.rs b/codex-rs/app-server/tests/suite/conversation_summary.rs index 6ad9f1c63..17f73db3e 100644 --- a/codex-rs/app-server/tests/suite/conversation_summary.rs +++ b/codex-rs/app-server/tests/suite/conversation_summary.rs @@ -121,6 +121,7 @@ async fn get_conversation_summary_by_thread_id_reads_pathless_store_thread() -> let thread_id = ThreadId::from_string("00000000-0000-4000-8000-000000000125")?; store .create_thread(CreateThreadParams { + session_id: thread_id.into(), thread_id, extra_config: None, forked_from_id: None, diff --git a/codex-rs/app-server/tests/suite/v2/client_metadata.rs b/codex-rs/app-server/tests/suite/v2/client_metadata.rs index 6b95c036d..e55cd0c24 100644 --- a/codex-rs/app-server/tests/suite/v2/client_metadata.rs +++ b/codex-rs/app-server/tests/suite/v2/client_metadata.rs @@ -300,7 +300,7 @@ async fn review_start_sends_parent_lineage_in_turn_metadata_for_thread_fork_v2() } #[tokio::test] -async fn turn_start_sends_other_subagent_lineage_after_cold_thread_resume_v2() -> Result<()> { +async fn turn_start_sends_nested_subagent_lineage_after_cold_thread_resume_v2() -> Result<()> { skip_if_no_network!(Ok(())); let server = responses::start_mock_server().await; @@ -321,6 +321,8 @@ async fn turn_start_sends_other_subagent_lineage_after_cold_thread_resume_v2() - /*supports_websockets*/ false, )?; + let root_thread_id = CoreThreadId::new(); + let root_thread_id_str = root_thread_id.to_string(); let parent_thread_id = CoreThreadId::new(); let parent_thread_id_str = parent_thread_id.to_string(); let subagent_thread_id = create_fake_parented_rollout_with_source( @@ -331,6 +333,7 @@ async fn turn_start_sends_other_subagent_lineage_after_cold_thread_resume_v2() - Some("mock_provider"), /*git_info*/ None, SessionSource::SubAgent(SubAgentSource::Other("guardian".to_string())), + root_thread_id.into(), parent_thread_id, )?; @@ -350,6 +353,7 @@ async fn turn_start_sends_other_subagent_lineage_after_cold_thread_resume_v2() - .await??; let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; assert_eq!(thread.id, subagent_thread_id); + assert_eq!(thread.session_id, root_thread_id_str); assert_eq!(thread.parent_thread_id, Some(parent_thread_id_str.clone())); assert_eq!( thread.source, @@ -390,6 +394,10 @@ async fn turn_start_sends_other_subagent_lineage_after_cold_thread_resume_v2() - Some(parent_thread_id_str.as_str()) ); assert_eq!(metadata["subagent_kind"].as_str(), Some("guardian")); + assert_eq!( + metadata["session_id"].as_str(), + Some(thread.session_id.as_str()) + ); assert_eq!(metadata["thread_id"].as_str(), Some(thread.id.as_str())); assert_eq!(metadata["turn_id"].as_str(), Some(turn.id.as_str())); assert!(metadata.get("forked_from_thread_id").is_none()); diff --git a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs index baf11aff5..6deec70ff 100644 --- a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs +++ b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs @@ -147,6 +147,7 @@ async fn thread_delete_with_non_local_thread_store_does_not_create_local_persist let unloaded_thread_id = ThreadId::from_string(&Uuid::new_v4().to_string())?; thread_store .create_thread(StoreCreateThreadParams { + session_id: unloaded_thread_id.into(), thread_id: unloaded_thread_id, extra_config: None, forked_from_id: None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 9bdff3bb8..977951302 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -1252,6 +1252,7 @@ async fn thread_list_filters_by_subagent_variant() -> Result<()> { Some("mock_provider"), /*git_info*/ None, CoreSessionSource::SubAgent(SubAgentSource::Review), + parent_thread_id.into(), parent_thread_id, )?; let compact_id = create_fake_rollout_with_source( diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index a6ed5e4a6..58990aaf0 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -1357,6 +1357,7 @@ async fn seed_pathless_store_thread( ) -> Result<()> { store .create_thread(CreateThreadParams { + session_id: thread_id.into(), thread_id, extra_config: None, forked_from_id: None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 07ee66caf..0d1fcd1e5 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -2054,6 +2054,7 @@ stream_max_retries = 0 let rollout_dir = rollout_path.parent().expect("rollout parent directory"); std::fs::create_dir_all(rollout_dir)?; let session_meta = SessionMeta { + session_id: conversation_id.into(), id: conversation_id, forked_from_id: None, parent_thread_id: None, @@ -2735,6 +2736,7 @@ async fn thread_resume_rejects_mismatched_path_for_running_thread_id() -> Result "timestamp": "2025-01-01T00:00:00Z", "type": "session_meta", "payload": { + "session_id": thread_uuid, "id": thread_uuid, "timestamp": "2025-01-01T00:00:00Z", "cwd": codex_home.path(), diff --git a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs index 7267ab073..46a989545 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs @@ -208,6 +208,7 @@ async fn thread_unarchive_preserves_pathless_store_metadata() -> Result<()> { let parent_thread_id = ThreadId::from_string("00000000-0000-4000-8000-000000000127")?; store .create_thread(CreateThreadParams { + session_id: thread_id.into(), thread_id, extra_config: None, forked_from_id: Some(parent_thread_id), diff --git a/codex-rs/cli/src/doctor/thread_inventory.rs b/codex-rs/cli/src/doctor/thread_inventory.rs index 9a8eef6ee..c50e6debd 100644 --- a/codex-rs/cli/src/doctor/thread_inventory.rs +++ b/codex-rs/cli/src/doctor/thread_inventory.rs @@ -816,11 +816,13 @@ mod tests { }; std::fs::create_dir_all(&root).expect("rollout dir"); let path = root.join(format!("rollout-{timestamp}-{thread_id}.jsonl")); + let parsed_thread_id = ThreadId::from_string(thread_id).expect("thread id"); let rollout_line = RolloutLine { timestamp: timestamp.to_string(), item: RolloutItem::SessionMeta(codex_protocol::protocol::SessionMetaLine { meta: codex_protocol::protocol::SessionMeta { - id: ThreadId::from_string(thread_id).expect("thread id"), + session_id: parsed_thread_id.into(), + id: parsed_thread_id, timestamp: timestamp.to_string(), cwd: self.codex_home.path().to_path_buf(), originator: "test".to_string(), diff --git a/codex-rs/core/src/personality_migration_tests.rs b/codex-rs/core/src/personality_migration_tests.rs index 4cf8ab9d8..57f06317a 100644 --- a/codex-rs/core/src/personality_migration_tests.rs +++ b/codex-rs/core/src/personality_migration_tests.rs @@ -43,6 +43,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R let session_meta = SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 0c07f5e95..931742834 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -525,6 +525,33 @@ impl Session { } InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id, }; + let resumed_session_id = match &initial_history { + InitialHistory::Resumed(resumed) => { + resumed.history.iter().find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.session_id), + _ => None, + }) + } + InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => None, + }; + // Legacy subagent rollouts synthesize session_id from their own thread id. + let resumed_session_id = resumed_session_id.filter(|session_id| { + !session_configuration.session_source.is_non_root_agent() + || *session_id != SessionId::from(thread_id) + }); + let session_id = resumed_session_id.unwrap_or_else(|| { + if session_configuration.session_source.is_non_root_agent() { + agent_control.session_id() + } else { + SessionId::from(thread_id) + } + }); + let agent_control = agent_control.with_session_id( + session_id, + config + .effective_agent_max_threads(MultiAgentVersion::V2) + .unwrap_or(usize::MAX), + ); let time_provider = crate::current_time::resolve_time_provider( config.current_time_reminder.as_ref(), external_time_provider, @@ -546,6 +573,7 @@ impl Session { let live_thread = match &initial_history { InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => { let params = CreateThreadParams { + session_id, thread_id, extra_config: config.extra_config.clone(), forked_from_id, @@ -952,17 +980,6 @@ impl Session { config.analytics_enabled, ) }); - let session_id = if session_configuration.session_source.is_non_root_agent() { - agent_control.session_id() - } else { - SessionId::from(thread_id) - }; - let agent_control = agent_control.with_session_id( - session_id, - config - .effective_agent_max_threads(MultiAgentVersion::V2) - .unwrap_or(usize::MAX), - ); // Keep one stable manager handle for the session so extension resource clients // automatically observe the manager installed at startup and on later refreshes. let mcp_connection_manager = Arc::new(arc_swap::ArcSwap::from_pointee( diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 6f84da3c7..f3f96b74a 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -1900,6 +1900,7 @@ fn session_meta_item( ) -> RolloutItem { RolloutItem::SessionMeta(SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, multi_agent_version, ..SessionMeta::default() @@ -3750,6 +3751,7 @@ async fn attach_thread_persistence(session: &mut Session) -> PathBuf { let live_thread = LiveThread::create( Arc::clone(&session.services.thread_store), CreateThreadParams { + session_id: session.session_id(), thread_id: session.thread_id, extra_config: None, forked_from_id: None, @@ -5453,7 +5455,7 @@ async fn resumed_root_session_uses_thread_id_as_session_id() { } #[tokio::test] -async fn resumed_subagent_session_keeps_inherited_session_id() { +async fn resumed_subagent_session_restores_persisted_session_id() { let parent_thread_id = ThreadId::new(); let parent_session_id = SessionId::from(parent_thread_id); let thread_id = ThreadId::new(); @@ -5467,11 +5469,19 @@ async fn resumed_subagent_session_keeps_inherited_session_id() { let (session, rx_event) = make_session_with_history_source_and_agent_control_and_rx( InitialHistory::Resumed(ResumedHistory { conversation_id: thread_id, - history: Vec::new(), + history: vec![RolloutItem::SessionMeta(SessionMetaLine { + meta: SessionMeta { + session_id: parent_session_id, + id: thread_id, + source: session_source.clone(), + ..SessionMeta::default() + }, + git: None, + })], rollout_path: None, }), session_source, - AgentControl::default().with_session_id(parent_session_id, /*max_threads*/ usize::MAX), + AgentControl::default(), ) .await .expect("resume should succeed"); @@ -6590,6 +6600,7 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() { let live_thread = LiveThread::create( Arc::clone(&thread_store), CreateThreadParams { + session_id: session.session_id(), thread_id: session.thread_id, extra_config: None, forked_from_id: None, diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index fe543a875..f9b2fe42e 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -516,6 +516,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { "timestamp": "2024-01-01T00:00:00.000Z", "type": "session_meta", "payload": { + "session_id": convo_id, "id": convo_id, "timestamp": "2024-01-01T00:00:00Z", "instructions": "be nice", @@ -726,12 +727,14 @@ async fn resume_replays_legacy_js_repl_image_rollout_shapes() { metadata: None, }; let legacy_image_url = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg=="; + let thread_id = ThreadId::default(); let rollout = vec![ RolloutLine { timestamp: "2024-01-01T00:00:00.000Z".to_string(), item: RolloutItem::SessionMeta(SessionMetaLine { meta: SessionMeta { - id: ThreadId::default(), + session_id: thread_id.into(), + id: thread_id, parent_thread_id: None, timestamp: "2024-01-01T00:00:00Z".to_string(), cwd: ".".into(), @@ -860,12 +863,14 @@ async fn resume_replays_image_tool_outputs_with_detail() { let image_url = "data:image/webp;base64,UklGRiIAAABXRUJQVlA4IBYAAAAwAQCdASoBAAEAAUAmJaACdLoB+AADsAD+8ut//NgVzXPv9//S4P0uD9Lg/9KQAAA="; let function_call_id = "view-image-call"; let custom_call_id = "js-repl-call"; + let thread_id = ThreadId::default(); let rollout = vec![ RolloutLine { timestamp: "2024-01-01T00:00:00.000Z".to_string(), item: RolloutItem::SessionMeta(SessionMetaLine { meta: SessionMeta { - id: ThreadId::default(), + session_id: thread_id.into(), + id: thread_id, parent_thread_id: None, timestamp: "2024-01-01T00:00:00Z".to_string(), cwd: ".".into(), diff --git a/codex-rs/core/tests/suite/personality_migration.rs b/codex-rs/core/tests/suite/personality_migration.rs index 530227415..bbab96181 100644 --- a/codex-rs/core/tests/suite/personality_migration.rs +++ b/codex-rs/core/tests/suite/personality_migration.rs @@ -59,6 +59,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R let session_meta = SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, @@ -109,6 +110,7 @@ async fn write_rollout_with_meta_only(dir: &Path, thread_id: ThreadId) -> io::Re let session_meta = SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index 6b0f8552a..99794939d 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -511,6 +511,7 @@ async fn review_input_isolated_from_parent_history() { "timestamp": "2024-01-01T00:00:00.000Z", "type": "session_meta", "payload": { + "session_id": convo_id, "id": convo_id, "timestamp": "2024-01-01T00:00:00Z", "cwd": ".", diff --git a/codex-rs/core/tests/suite/rollout_list_find.rs b/codex-rs/core/tests/suite/rollout_list_find.rs index 680301d9f..1461c1af3 100644 --- a/codex-rs/core/tests/suite/rollout_list_find.rs +++ b/codex-rs/core/tests/suite/rollout_list_find.rs @@ -42,6 +42,7 @@ fn write_minimal_rollout_with_id_at_path(file: &Path, id: Uuid) { "timestamp": "2024-01-01T00:00:00.000Z", "type": "session_meta", "payload": { + "session_id": id, "id": id, "timestamp": "2024-01-01T00:00:00Z", "cwd": ".", diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index eae82de9e..36a51f80c 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -356,6 +356,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { fs::create_dir_all(parent).expect("should create rollout directory"); let session_meta_line = SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 483618f06..5900ffe3c 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -57,7 +57,9 @@ use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_path_uri::PathUri; use schemars::JsonSchema; use serde::Deserialize; +use serde::Deserializer; use serde::Serialize; +use serde::de::Error as _; use serde_json::Value; use serde_with::serde_as; use strum_macros::Display; @@ -2914,6 +2916,7 @@ pub enum MultiAgentVersion { /// and should be used when there is no config override. #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)] pub struct SessionMeta { + pub session_id: SessionId, pub id: ThreadId, #[serde(skip_serializing_if = "Option::is_none")] pub forked_from_id: Option, @@ -2956,8 +2959,10 @@ pub struct SessionMeta { impl Default for SessionMeta { fn default() -> Self { + let id = ThreadId::default(); SessionMeta { - id: ThreadId::default(), + session_id: id.into(), + id, forked_from_id: None, parent_thread_id: None, timestamp: String::new(), @@ -2978,7 +2983,7 @@ impl Default for SessionMeta { } } -#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)] +#[derive(Serialize, Debug, Clone, JsonSchema, TS)] pub struct SessionMetaLine { #[serde(flatten)] pub meta: SessionMeta, @@ -2986,6 +2991,35 @@ pub struct SessionMetaLine { pub git: Option, } +impl<'de> Deserialize<'de> for SessionMetaLine { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + struct SessionMetaLineFields { + #[serde(flatten)] + meta: SessionMeta, + git: Option, + } + + let mut value = Value::deserialize(deserializer)?; + let fields = value + .as_object_mut() + .ok_or_else(|| D::Error::custom("session metadata must be an object"))?; + if !fields.contains_key("session_id") { + let thread_id = fields + .get("id") + .cloned() + .ok_or_else(|| D::Error::missing_field("id"))?; + fields.insert("session_id".to_string(), thread_id); + } + let SessionMetaLineFields { meta, git } = + serde_json::from_value(value).map_err(D::Error::custom)?; + Ok(Self { meta, git }) + } +} + #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)] #[serde(tag = "type", content = "payload", rename_all = "snake_case")] pub enum RolloutItem { @@ -5390,6 +5424,7 @@ mod tests { let thread_id = ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8")?; let older_meta = SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, multi_agent_version: Some(MultiAgentVersion::V2), ..Default::default() @@ -5398,6 +5433,7 @@ mod tests { }; let newer_meta_without_version = SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, multi_agent_version: None, ..Default::default() diff --git a/codex-rs/rollout/src/compression_tests.rs b/codex-rs/rollout/src/compression_tests.rs index c2a8b7d7d..60f47d078 100644 --- a/codex-rs/rollout/src/compression_tests.rs +++ b/codex-rs/rollout/src/compression_tests.rs @@ -456,6 +456,7 @@ fn write_rollout(path: &std::path::Path, thread_id: ThreadId, message: &str) -> fs::create_dir_all(parent)?; let session_meta_line = SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/rollout/src/metadata_tests.rs b/codex-rs/rollout/src/metadata_tests.rs index e6771e37c..5f923f16d 100644 --- a/codex-rs/rollout/src/metadata_tests.rs +++ b/codex-rs/rollout/src/metadata_tests.rs @@ -33,6 +33,7 @@ async fn extract_metadata_from_rollout_uses_session_meta() { .join(format!("rollout-2026-01-27T12-34-56-{uuid}.jsonl")); let session_meta = SessionMeta { + session_id: id.into(), id, forked_from_id: None, parent_thread_id: None, @@ -88,6 +89,7 @@ async fn extract_metadata_from_rollout_returns_latest_memory_mode() { .join(format!("rollout-2026-01-27T12-34-56-{uuid}.jsonl")); let session_meta = SessionMeta { + session_id: id.into(), id, forked_from_id: None, parent_thread_id: None, @@ -355,6 +357,7 @@ fn write_rollout_in_sessions_with_cwd( std::fs::create_dir_all(sessions_dir.as_path()).expect("create sessions dir"); let path = sessions_dir.join(format!("rollout-{filename_ts}-{thread_uuid}.jsonl")); let session_meta = SessionMeta { + session_id: id.into(), id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 7ba111e94..084acb0d7 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use std::sync::Mutex; use chrono::SecondsFormat; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::models::BaseInstructions; @@ -81,10 +82,11 @@ pub struct RolloutRecorder { #[derive(Clone)] pub enum RolloutRecorderParams { Create { + session_id: SessionId, conversation_id: ThreadId, forked_from_id: Option, parent_thread_id: Option, - source: SessionSource, + source: Box, thread_source: Option, base_instructions: BaseInstructions, dynamic_tools: Vec, @@ -167,10 +169,11 @@ impl RolloutRecorderParams { dynamic_tools: Vec, ) -> Self { Self::Create { + session_id: conversation_id.into(), conversation_id, forked_from_id, parent_thread_id, - source, + source: Box::new(source), thread_source, base_instructions, dynamic_tools, @@ -178,6 +181,13 @@ impl RolloutRecorderParams { } } + pub fn with_session_id(mut self, session_id: SessionId) -> Self { + if let Self::Create { session_id: id, .. } = &mut self { + *id = session_id; + } + self + } + pub fn with_multi_agent_version( mut self, multi_agent_version: Option, @@ -696,6 +706,7 @@ impl RolloutRecorder { ) -> std::io::Result { let (file, deferred_log_file_info, rollout_path, meta) = match params { RolloutRecorderParams::Create { + session_id, conversation_id, forked_from_id, parent_thread_id, @@ -707,7 +718,7 @@ impl RolloutRecorder { } => { let log_file_info = precompute_log_file_info(config, conversation_id)?; let path = log_file_info.path.clone(); - let session_id = log_file_info.conversation_id; + let thread_id = log_file_info.conversation_id; let started_at = log_file_info.timestamp; let timestamp_format: &[FormatItem] = format_description!( @@ -719,7 +730,8 @@ impl RolloutRecorder { .map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?; let session_meta = SessionMeta { - id: session_id, + session_id, + id: thread_id, forked_from_id, parent_thread_id, timestamp, @@ -729,7 +741,7 @@ impl RolloutRecorder { agent_nickname: source.get_nickname(), agent_role: source.get_agent_role(), agent_path: source.get_agent_path().map(Into::into), - source, + source: *source, thread_source, model_provider: Some(config.model_provider_id().to_string()), base_instructions: Some(base_instructions), diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 8701578eb..738fde899 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -3,6 +3,7 @@ use super::*; use crate::config::RolloutConfig; use chrono::TimeZone; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::AgentMessageEvent; @@ -45,6 +46,7 @@ fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result anyhow::Result<()> { let session_meta_line = SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, @@ -145,7 +148,7 @@ async fn state_db_init_backfills_before_returning() -> anyhow::Result<()> { } #[tokio::test] -async fn load_rollout_items_skips_legacy_ghost_snapshot_lines() -> std::io::Result<()> { +async fn load_rollout_items_defaults_legacy_session_id() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); let rollout_path = home.path().join("rollout.jsonl"); let mut file = File::create(&rollout_path)?; @@ -210,7 +213,10 @@ async fn load_rollout_items_skips_legacy_ghost_snapshot_lines() -> std::io::Resu assert_eq!(loaded_thread_id, Some(thread_id)); assert_eq!(parse_errors, 0); assert_eq!(items.len(), 2); - assert!(matches!(items[0], RolloutItem::SessionMeta(_))); + let RolloutItem::SessionMeta(session_meta) = &items[0] else { + panic!("expected session metadata"); + }; + assert_eq!(session_meta.meta.session_id, SessionId::from(thread_id)); assert!(matches!( items[1], RolloutItem::ResponseItem(ResponseItem::Message { .. }) @@ -234,6 +240,7 @@ async fn load_rollout_items_preserves_legacy_guardian_assessment_lines() -> std: "timestamp": ts, "type": "session_meta", "payload": { + "session_id": thread_id, "id": thread_id, "timestamp": ts, "cwd": ".", @@ -297,6 +304,7 @@ async fn load_rollout_items_filters_legacy_ghost_snapshots_from_compaction_histo "timestamp": ts, "type": "session_meta", "payload": { + "session_id": thread_id, "id": thread_id, "timestamp": ts, "cwd": ".", @@ -365,6 +373,7 @@ async fn load_rollout_items_filters_legacy_ghost_snapshots_from_compaction_histo async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); + let session_id = SessionId::default(); let thread_id = ThreadId::new(); let recorder = RolloutRecorder::new( &config, @@ -376,7 +385,8 @@ async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result< /*thread_source*/ None, BaseInstructions::default(), Vec::new(), - ), + ) + .with_session_id(session_id), ) .await?; @@ -421,10 +431,12 @@ async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result< assert!(rollout_path.exists(), "rollout file should be materialized"); let text = std::fs::read_to_string(&rollout_path)?; - assert!( - text.contains("\"type\":\"session_meta\""), - "expected session metadata in rollout" - ); + let first_line = text.lines().next().expect("session metadata line"); + let session_meta: RolloutLine = serde_json::from_str(first_line)?; + let RolloutItem::SessionMeta(session_meta) = session_meta.item else { + panic!("expected session metadata in rollout"); + }; + assert_eq!(session_meta.meta.session_id, session_id); let buffered_idx = text .find("buffered-event") .expect("buffered event in rollout"); @@ -732,6 +744,7 @@ async fn list_threads_state_db_only_skips_jsonl_repair_scan() -> std::io::Result "timestamp": ts, "type": "session_meta", "payload": { + "session_id": uuid, "id": uuid, "timestamp": ts, "cwd": home.path().display().to_string(), diff --git a/codex-rs/rollout/src/session_index_tests.rs b/codex-rs/rollout/src/session_index_tests.rs index 97496d394..33e4ae4a5 100644 --- a/codex-rs/rollout/src/session_index_tests.rs +++ b/codex-rs/rollout/src/session_index_tests.rs @@ -25,6 +25,7 @@ fn write_rollout_with_metadata(path: &Path, thread_id: ThreadId) -> std::io::Res timestamp: timestamp.clone(), item: RolloutItem::SessionMeta(SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/rollout/src/state_db_tests.rs b/codex-rs/rollout/src/state_db_tests.rs index ba92ca27f..4b2ae4f25 100644 --- a/codex-rs/rollout/src/state_db_tests.rs +++ b/codex-rs/rollout/src/state_db_tests.rs @@ -158,6 +158,7 @@ fn write_rollout_with_user_message( timestamp: "2026-06-01T14:26:25Z".to_string(), item: RolloutItem::SessionMeta(SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/rollout/src/tests.rs b/codex-rs/rollout/src/tests.rs index 0e3ae912d..21d712e0d 100644 --- a/codex-rs/rollout/src/tests.rs +++ b/codex-rs/rollout/src/tests.rs @@ -297,6 +297,7 @@ fn write_session_file_with_provider( let mut file = File::create(file_path)?; let mut payload = serde_json::json!({ + "session_id": uuid, "id": uuid, "timestamp": ts_str, "cwd": ".", @@ -370,6 +371,7 @@ fn write_goal_started_session_file( "timestamp": ts_str, "type": "session_meta", "payload": { + "session_id": uuid, "id": uuid, "timestamp": ts_str, "cwd": ".", @@ -451,6 +453,7 @@ fn write_session_file_with_delayed_user_event( Uuid::from_u128(100 + i as u128) }; let payload = serde_json::json!({ + "session_id": uuid, "id": id, "timestamp": ts_str, "cwd": ".", @@ -483,8 +486,9 @@ fn write_session_file_with_meta_payload( root: &Path, ts_str: &str, uuid: Uuid, - payload: serde_json::Value, + mut payload: serde_json::Value, ) -> std::io::Result<()> { + payload["session_id"] = serde_json::json!(uuid); let format: &[FormatItem] = format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]"); let dt = PrimitiveDateTime::parse(ts_str, format) @@ -1088,6 +1092,7 @@ async fn test_get_thread_contents() { "timestamp": ts, "type": "session_meta", "payload": { + "session_id": uuid, "id": uuid, "timestamp": ts, "cwd": ".", @@ -1267,6 +1272,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> { timestamp: ts.to_string(), item: RolloutItem::SessionMeta(SessionMetaLine { meta: SessionMeta { + session_id: conversation_id.into(), id: conversation_id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index e156cc911..b419ef328 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -321,6 +321,7 @@ mod tests { &mut metadata, &RolloutItem::SessionMeta(SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: Some( ThreadId::from_string(&Uuid::now_v7().to_string()).expect("thread id"), @@ -513,6 +514,7 @@ mod tests { &mut metadata, &RolloutItem::SessionMeta(SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 576f76026..a448dce56 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -1957,6 +1957,7 @@ mod tests { ); let items = vec![RolloutItem::SessionMeta(SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, @@ -2018,6 +2019,7 @@ mod tests { ); let items = vec![RolloutItem::SessionMeta(SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, forked_from_id: None, parent_thread_id: None, diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index a60409b74..a93e3644c 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -110,6 +110,7 @@ mod tests { ] { store .create_thread(CreateThreadParams { + session_id: thread_id.into(), thread_id, extra_config: None, forked_from_id: None, @@ -231,6 +232,7 @@ impl InMemoryThreadStore { let mut state = self.state.lock().await; state.calls.create_thread += 1; let session_meta = SessionMeta { + session_id: params.session_id, id: params.thread_id, forked_from_id: params.forked_from_id, parent_thread_id: params.parent_thread_id, diff --git a/codex-rs/thread-store/src/local/create_thread.rs b/codex-rs/thread-store/src/local/create_thread.rs index 05924881d..51a96b081 100644 --- a/codex-rs/thread-store/src/local/create_thread.rs +++ b/codex-rs/thread-store/src/local/create_thread.rs @@ -36,6 +36,7 @@ pub(super) async fn create_thread( params.base_instructions, params.dynamic_tools, ) + .with_session_id(params.session_id) .with_multi_agent_version(params.multi_agent_version), ) .await diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index 19a0d2955..e158c901a 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -1123,6 +1123,7 @@ mod tests { fn create_thread_params(thread_id: ThreadId) -> CreateThreadParams { CreateThreadParams { + session_id: thread_id.into(), thread_id, extra_config: None, forked_from_id: None, diff --git a/codex-rs/thread-store/src/local/read_thread.rs b/codex-rs/thread-store/src/local/read_thread.rs index a1e1c5919..1271e4fcf 100644 --- a/codex-rs/thread-store/src/local/read_thread.rs +++ b/codex-rs/thread-store/src/local/read_thread.rs @@ -821,6 +821,7 @@ mod tests { "timestamp": "2025-01-03T12:00:00Z", "type": "session_meta", "payload": { + "session_id": uuid, "id": uuid, "timestamp": "2025-01-03T12:00:00Z", "cwd": rollout_cwd, @@ -926,6 +927,7 @@ mod tests { "timestamp": "2025-01-03T12-00-00", "type": "session_meta", "payload": { + "session_id": uuid, "id": uuid, "timestamp": "2025-01-03T12-00-00", "cwd": home.path(), @@ -1091,6 +1093,7 @@ mod tests { "timestamp": "2025-01-03T12:00:00Z", "type": "session_meta", "payload": { + "session_id": uuid, "id": uuid, "timestamp": "2025-01-03T12:00:00Z", "cwd": home.path(), diff --git a/codex-rs/thread-store/src/local/test_support.rs b/codex-rs/thread-store/src/local/test_support.rs index 98321880f..10a55c4fd 100644 --- a/codex-rs/thread-store/src/local/test_support.rs +++ b/codex-rs/thread-store/src/local/test_support.rs @@ -77,6 +77,7 @@ pub(super) fn write_session_file_with_fork( "timestamp": ts, "type": "session_meta", "payload": { + "session_id": uuid, "id": uuid, "forked_from_id": forked_from_id, "timestamp": ts, diff --git a/codex-rs/thread-store/src/thread_metadata_sync.rs b/codex-rs/thread-store/src/thread_metadata_sync.rs index 0336006fb..7940f1c80 100644 --- a/codex-rs/thread-store/src/thread_metadata_sync.rs +++ b/codex-rs/thread-store/src/thread_metadata_sync.rs @@ -579,6 +579,7 @@ mod tests { fn session_meta(thread_id: ThreadId) -> SessionMetaLine { SessionMetaLine { meta: SessionMeta { + session_id: thread_id.into(), id: thread_id, timestamp: "2025-01-03T12:00:00Z".to_string(), source: SessionSource::Exec, diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 2e3c60723..74412bb9c 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -2,6 +2,7 @@ use std::path::PathBuf; use chrono::DateTime; use chrono::Utc; +use codex_protocol::SessionId; use codex_protocol::ThreadId; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::models::BaseInstructions; @@ -63,6 +64,8 @@ pub struct ExtraConfig {} /// Parameters required to create a persisted thread. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CreateThreadParams { + /// Session id shared by the root thread and all of its subagents. + pub session_id: SessionId, /// Thread id generated by Codex before opening persistence. pub thread_id: ThreadId, /// Optional extra configuration fields for the thread. diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index f189b2314..82d9a72bd 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -2041,6 +2041,7 @@ mod tests { std::fs::create_dir_all(parent)?; let session_meta = codex_protocol::protocol::SessionMeta { + session_id: thread_id.into(), id: thread_id, timestamp: meta_rfc3339.to_string(), cwd: cwd.to_path_buf(),