Persist session IDs across thread resume (#29327)

## Summary

A cold-resumed subagent kept its durable thread ID but could receive a
new session ID, splitting one agent tree across multiple sessions after
a restart.

Persist the root session ID in every rollout `SessionMeta`, carry it
through thread creation, and restore it before initializing the resumed
`Session` and `AgentControl`.

## Behavior

For a nested agent tree:

```text
root session R
  parent thread P
    child thread C
```

The child rollout stores:

```text
session_id:       R
parent_thread_id: P
id:               C
```

After a cold resume, the child still belongs to root session `R` while
its immediate parent remains `P`. The integration coverage uses distinct
values for all three IDs so it catches restoring the session from
`parent_thread_id`.

## Legacy rollouts

Previous rollouts have `id` but no `session_id`. `SessionMetaLine`
deserialization treats a missing `session_id` as `id`, keeping those
files readable, listable, and resumable. When a legacy subagent is
resumed through its root, that synthesized child ID no longer overrides
the inherited root-scoped `AgentControl`. New rollouts always persist
the explicit root session ID.
This commit is contained in:
jif
2026-06-22 08:36:08 +01:00
committed by GitHub
Unverified
parent 98845e4840
commit 6d15bb3d17
38 changed files with 193 additions and 34 deletions
@@ -200,6 +200,7 @@ impl ExternalAgentSessionImporter {
};
let now = Utc::now();
let create_params = CreateThreadParams {
session_id: thread_id.into(),
thread_id,
extra_config: None,
forked_from_id: None,
@@ -1004,6 +1004,7 @@ mod thread_processor_behavior_tests {
let timestamp = "2025-09-05T16:53:11.850Z".to_string();
let session_meta = SessionMeta {
session_id: conversation_id.into(),
id: conversation_id,
timestamp: timestamp.clone(),
model_provider: None,
@@ -1060,6 +1061,7 @@ mod thread_processor_behavior_tests {
let timestamp = "2025-09-05T16:53:11.850Z".to_string();
let session_meta = SessionMeta {
session_id: parent_thread_id.into(),
id: conversation_id,
timestamp: timestamp.clone(),
source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
@@ -1110,6 +1112,7 @@ mod thread_processor_behavior_tests {
let timestamp = "2025-09-05T16:53:11.850Z".to_string();
let session_meta = SessionMeta {
session_id: conversation_id.into(),
id: conversation_id,
forked_from_id: Some(forked_from_id),
timestamp: timestamp.clone(),
@@ -13,6 +13,7 @@ fn extract_conversation_summary_prefers_plain_user_messages() -> Result<()> {
let head = vec![
json!({
"session_id": conversation_id.to_string(),
"id": conversation_id.to_string(),
"timestamp": timestamp,
"cwd": "/",
+9 -1
View File
@@ -1,4 +1,5 @@
use anyhow::Result;
use codex_protocol::SessionId;
use codex_protocol::ThreadId;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::GitInfo;
@@ -127,11 +128,12 @@ pub fn create_fake_rollout_with_source(
model_provider,
git_info,
source,
/*session_id*/ None,
/*parent_thread_id*/ None,
)
}
/// Create a minimal rollout file with an explicit session source and control parent.
/// Create a minimal rollout file with an explicit root session and control parent.
#[allow(clippy::too_many_arguments)]
pub fn create_fake_parented_rollout_with_source(
codex_home: &Path,
@@ -141,6 +143,7 @@ pub fn create_fake_parented_rollout_with_source(
model_provider: Option<&str>,
git_info: Option<GitInfo>,
source: SessionSource,
session_id: SessionId,
parent_thread_id: ThreadId,
) -> Result<String> {
create_fake_rollout_with_source_and_parent_thread_id(
@@ -151,6 +154,7 @@ pub fn create_fake_parented_rollout_with_source(
model_provider,
git_info,
source,
Some(session_id),
Some(parent_thread_id),
)
}
@@ -164,11 +168,13 @@ fn create_fake_rollout_with_source_and_parent_thread_id(
model_provider: Option<&str>,
git_info: Option<GitInfo>,
source: SessionSource,
session_id: Option<SessionId>,
parent_thread_id: Option<ThreadId>,
) -> Result<String> {
let uuid = Uuid::new_v4();
let uuid_str = uuid.to_string();
let conversation_id = ThreadId::from_string(&uuid_str)?;
let session_id = session_id.unwrap_or_else(|| conversation_id.into());
let file_path = rollout_path(codex_home, filename_ts, &uuid_str);
let dir = file_path
@@ -178,6 +184,7 @@ fn create_fake_rollout_with_source_and_parent_thread_id(
// Build JSONL lines
let meta = SessionMeta {
session_id,
id: conversation_id,
forked_from_id: None,
parent_thread_id,
@@ -264,6 +271,7 @@ pub fn create_fake_rollout_with_text_elements(
// Build JSONL lines
let meta = SessionMeta {
session_id: conversation_id.into(),
id: conversation_id,
forked_from_id: None,
parent_thread_id: None,
@@ -121,6 +121,7 @@ async fn get_conversation_summary_by_thread_id_reads_pathless_store_thread() ->
let thread_id = ThreadId::from_string("00000000-0000-4000-8000-000000000125")?;
store
.create_thread(CreateThreadParams {
session_id: thread_id.into(),
thread_id,
extra_config: None,
forked_from_id: None,
@@ -300,7 +300,7 @@ async fn review_start_sends_parent_lineage_in_turn_metadata_for_thread_fork_v2()
}
#[tokio::test]
async fn turn_start_sends_other_subagent_lineage_after_cold_thread_resume_v2() -> Result<()> {
async fn turn_start_sends_nested_subagent_lineage_after_cold_thread_resume_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
@@ -321,6 +321,8 @@ async fn turn_start_sends_other_subagent_lineage_after_cold_thread_resume_v2() -
/*supports_websockets*/ false,
)?;
let root_thread_id = CoreThreadId::new();
let root_thread_id_str = root_thread_id.to_string();
let parent_thread_id = CoreThreadId::new();
let parent_thread_id_str = parent_thread_id.to_string();
let subagent_thread_id = create_fake_parented_rollout_with_source(
@@ -331,6 +333,7 @@ async fn turn_start_sends_other_subagent_lineage_after_cold_thread_resume_v2() -
Some("mock_provider"),
/*git_info*/ None,
SessionSource::SubAgent(SubAgentSource::Other("guardian".to_string())),
root_thread_id.into(),
parent_thread_id,
)?;
@@ -350,6 +353,7 @@ async fn turn_start_sends_other_subagent_lineage_after_cold_thread_resume_v2() -
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(thread.id, subagent_thread_id);
assert_eq!(thread.session_id, root_thread_id_str);
assert_eq!(thread.parent_thread_id, Some(parent_thread_id_str.clone()));
assert_eq!(
thread.source,
@@ -390,6 +394,10 @@ async fn turn_start_sends_other_subagent_lineage_after_cold_thread_resume_v2() -
Some(parent_thread_id_str.as_str())
);
assert_eq!(metadata["subagent_kind"].as_str(), Some("guardian"));
assert_eq!(
metadata["session_id"].as_str(),
Some(thread.session_id.as_str())
);
assert_eq!(metadata["thread_id"].as_str(), Some(thread.id.as_str()));
assert_eq!(metadata["turn_id"].as_str(), Some(turn.id.as_str()));
assert!(metadata.get("forked_from_thread_id").is_none());
@@ -147,6 +147,7 @@ async fn thread_delete_with_non_local_thread_store_does_not_create_local_persist
let unloaded_thread_id = ThreadId::from_string(&Uuid::new_v4().to_string())?;
thread_store
.create_thread(StoreCreateThreadParams {
session_id: unloaded_thread_id.into(),
thread_id: unloaded_thread_id,
extra_config: None,
forked_from_id: None,
@@ -1252,6 +1252,7 @@ async fn thread_list_filters_by_subagent_variant() -> Result<()> {
Some("mock_provider"),
/*git_info*/ None,
CoreSessionSource::SubAgent(SubAgentSource::Review),
parent_thread_id.into(),
parent_thread_id,
)?;
let compact_id = create_fake_rollout_with_source(
@@ -1357,6 +1357,7 @@ async fn seed_pathless_store_thread(
) -> Result<()> {
store
.create_thread(CreateThreadParams {
session_id: thread_id.into(),
thread_id,
extra_config: None,
forked_from_id: None,
@@ -2054,6 +2054,7 @@ stream_max_retries = 0
let rollout_dir = rollout_path.parent().expect("rollout parent directory");
std::fs::create_dir_all(rollout_dir)?;
let session_meta = SessionMeta {
session_id: conversation_id.into(),
id: conversation_id,
forked_from_id: None,
parent_thread_id: None,
@@ -2735,6 +2736,7 @@ async fn thread_resume_rejects_mismatched_path_for_running_thread_id() -> Result
"timestamp": "2025-01-01T00:00:00Z",
"type": "session_meta",
"payload": {
"session_id": thread_uuid,
"id": thread_uuid,
"timestamp": "2025-01-01T00:00:00Z",
"cwd": codex_home.path(),
@@ -208,6 +208,7 @@ async fn thread_unarchive_preserves_pathless_store_metadata() -> Result<()> {
let parent_thread_id = ThreadId::from_string("00000000-0000-4000-8000-000000000127")?;
store
.create_thread(CreateThreadParams {
session_id: thread_id.into(),
thread_id,
extra_config: None,
forked_from_id: Some(parent_thread_id),
+3 -1
View File
@@ -816,11 +816,13 @@ mod tests {
};
std::fs::create_dir_all(&root).expect("rollout dir");
let path = root.join(format!("rollout-{timestamp}-{thread_id}.jsonl"));
let parsed_thread_id = ThreadId::from_string(thread_id).expect("thread id");
let rollout_line = RolloutLine {
timestamp: timestamp.to_string(),
item: RolloutItem::SessionMeta(codex_protocol::protocol::SessionMetaLine {
meta: codex_protocol::protocol::SessionMeta {
id: ThreadId::from_string(thread_id).expect("thread id"),
session_id: parsed_thread_id.into(),
id: parsed_thread_id,
timestamp: timestamp.to_string(),
cwd: self.codex_home.path().to_path_buf(),
originator: "test".to_string(),
@@ -43,6 +43,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R
let session_meta = SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
+28 -11
View File
@@ -525,6 +525,33 @@ impl Session {
}
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
};
let resumed_session_id = match &initial_history {
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.session_id),
_ => None,
})
}
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => None,
};
// Legacy subagent rollouts synthesize session_id from their own thread id.
let resumed_session_id = resumed_session_id.filter(|session_id| {
!session_configuration.session_source.is_non_root_agent()
|| *session_id != SessionId::from(thread_id)
});
let session_id = resumed_session_id.unwrap_or_else(|| {
if session_configuration.session_source.is_non_root_agent() {
agent_control.session_id()
} else {
SessionId::from(thread_id)
}
});
let agent_control = agent_control.with_session_id(
session_id,
config
.effective_agent_max_threads(MultiAgentVersion::V2)
.unwrap_or(usize::MAX),
);
let time_provider = crate::current_time::resolve_time_provider(
config.current_time_reminder.as_ref(),
external_time_provider,
@@ -546,6 +573,7 @@ impl Session {
let live_thread = match &initial_history {
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => {
let params = CreateThreadParams {
session_id,
thread_id,
extra_config: config.extra_config.clone(),
forked_from_id,
@@ -952,17 +980,6 @@ impl Session {
config.analytics_enabled,
)
});
let session_id = if session_configuration.session_source.is_non_root_agent() {
agent_control.session_id()
} else {
SessionId::from(thread_id)
};
let agent_control = agent_control.with_session_id(
session_id,
config
.effective_agent_max_threads(MultiAgentVersion::V2)
.unwrap_or(usize::MAX),
);
// Keep one stable manager handle for the session so extension resource clients
// automatically observe the manager installed at startup and on later refreshes.
let mcp_connection_manager = Arc::new(arc_swap::ArcSwap::from_pointee(
+14 -3
View File
@@ -1900,6 +1900,7 @@ fn session_meta_item(
) -> RolloutItem {
RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
multi_agent_version,
..SessionMeta::default()
@@ -3750,6 +3751,7 @@ async fn attach_thread_persistence(session: &mut Session) -> PathBuf {
let live_thread = LiveThread::create(
Arc::clone(&session.services.thread_store),
CreateThreadParams {
session_id: session.session_id(),
thread_id: session.thread_id,
extra_config: None,
forked_from_id: None,
@@ -5453,7 +5455,7 @@ async fn resumed_root_session_uses_thread_id_as_session_id() {
}
#[tokio::test]
async fn resumed_subagent_session_keeps_inherited_session_id() {
async fn resumed_subagent_session_restores_persisted_session_id() {
let parent_thread_id = ThreadId::new();
let parent_session_id = SessionId::from(parent_thread_id);
let thread_id = ThreadId::new();
@@ -5467,11 +5469,19 @@ async fn resumed_subagent_session_keeps_inherited_session_id() {
let (session, rx_event) = make_session_with_history_source_and_agent_control_and_rx(
InitialHistory::Resumed(ResumedHistory {
conversation_id: thread_id,
history: Vec::new(),
history: vec![RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
session_id: parent_session_id,
id: thread_id,
source: session_source.clone(),
..SessionMeta::default()
},
git: None,
})],
rollout_path: None,
}),
session_source,
AgentControl::default().with_session_id(parent_session_id, /*max_threads*/ usize::MAX),
AgentControl::default(),
)
.await
.expect("resume should succeed");
@@ -6590,6 +6600,7 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() {
let live_thread = LiveThread::create(
Arc::clone(&thread_store),
CreateThreadParams {
session_id: session.session_id(),
thread_id: session.thread_id,
extra_config: None,
forked_from_id: None,
+7 -2
View File
@@ -516,6 +516,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
"timestamp": "2024-01-01T00:00:00.000Z",
"type": "session_meta",
"payload": {
"session_id": convo_id,
"id": convo_id,
"timestamp": "2024-01-01T00:00:00Z",
"instructions": "be nice",
@@ -726,12 +727,14 @@ async fn resume_replays_legacy_js_repl_image_rollout_shapes() {
metadata: None,
};
let legacy_image_url = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==";
let thread_id = ThreadId::default();
let rollout = vec![
RolloutLine {
timestamp: "2024-01-01T00:00:00.000Z".to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: ThreadId::default(),
session_id: thread_id.into(),
id: thread_id,
parent_thread_id: None,
timestamp: "2024-01-01T00:00:00Z".to_string(),
cwd: ".".into(),
@@ -860,12 +863,14 @@ async fn resume_replays_image_tool_outputs_with_detail() {
let image_url = "data:image/webp;base64,UklGRiIAAABXRUJQVlA4IBYAAAAwAQCdASoBAAEAAUAmJaACdLoB+AADsAD+8ut//NgVzXPv9//S4P0uD9Lg/9KQAAA=";
let function_call_id = "view-image-call";
let custom_call_id = "js-repl-call";
let thread_id = ThreadId::default();
let rollout = vec![
RolloutLine {
timestamp: "2024-01-01T00:00:00.000Z".to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: ThreadId::default(),
session_id: thread_id.into(),
id: thread_id,
parent_thread_id: None,
timestamp: "2024-01-01T00:00:00Z".to_string(),
cwd: ".".into(),
@@ -59,6 +59,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R
let session_meta = SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
@@ -109,6 +110,7 @@ async fn write_rollout_with_meta_only(dir: &Path, thread_id: ThreadId) -> io::Re
let session_meta = SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
+1
View File
@@ -511,6 +511,7 @@ async fn review_input_isolated_from_parent_history() {
"timestamp": "2024-01-01T00:00:00.000Z",
"type": "session_meta",
"payload": {
"session_id": convo_id,
"id": convo_id,
"timestamp": "2024-01-01T00:00:00Z",
"cwd": ".",
@@ -42,6 +42,7 @@ fn write_minimal_rollout_with_id_at_path(file: &Path, id: Uuid) {
"timestamp": "2024-01-01T00:00:00.000Z",
"type": "session_meta",
"payload": {
"session_id": id,
"id": id,
"timestamp": "2024-01-01T00:00:00Z",
"cwd": ".",
@@ -356,6 +356,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
fs::create_dir_all(parent).expect("should create rollout directory");
let session_meta_line = SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
+38 -2
View File
@@ -57,7 +57,9 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_path_uri::PathUri;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::de::Error as _;
use serde_json::Value;
use serde_with::serde_as;
use strum_macros::Display;
@@ -2914,6 +2916,7 @@ pub enum MultiAgentVersion {
/// and should be used when there is no config override.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)]
pub struct SessionMeta {
pub session_id: SessionId,
pub id: ThreadId,
#[serde(skip_serializing_if = "Option::is_none")]
pub forked_from_id: Option<ThreadId>,
@@ -2956,8 +2959,10 @@ pub struct SessionMeta {
impl Default for SessionMeta {
fn default() -> Self {
let id = ThreadId::default();
SessionMeta {
id: ThreadId::default(),
session_id: id.into(),
id,
forked_from_id: None,
parent_thread_id: None,
timestamp: String::new(),
@@ -2978,7 +2983,7 @@ impl Default for SessionMeta {
}
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
#[derive(Serialize, Debug, Clone, JsonSchema, TS)]
pub struct SessionMetaLine {
#[serde(flatten)]
pub meta: SessionMeta,
@@ -2986,6 +2991,35 @@ pub struct SessionMetaLine {
pub git: Option<GitInfo>,
}
impl<'de> Deserialize<'de> for SessionMetaLine {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct SessionMetaLineFields {
#[serde(flatten)]
meta: SessionMeta,
git: Option<GitInfo>,
}
let mut value = Value::deserialize(deserializer)?;
let fields = value
.as_object_mut()
.ok_or_else(|| D::Error::custom("session metadata must be an object"))?;
if !fields.contains_key("session_id") {
let thread_id = fields
.get("id")
.cloned()
.ok_or_else(|| D::Error::missing_field("id"))?;
fields.insert("session_id".to_string(), thread_id);
}
let SessionMetaLineFields { meta, git } =
serde_json::from_value(value).map_err(D::Error::custom)?;
Ok(Self { meta, git })
}
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
pub enum RolloutItem {
@@ -5390,6 +5424,7 @@ mod tests {
let thread_id = ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8")?;
let older_meta = SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
multi_agent_version: Some(MultiAgentVersion::V2),
..Default::default()
@@ -5398,6 +5433,7 @@ mod tests {
};
let newer_meta_without_version = SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
multi_agent_version: None,
..Default::default()
@@ -456,6 +456,7 @@ fn write_rollout(path: &std::path::Path, thread_id: ThreadId, message: &str) ->
fs::create_dir_all(parent)?;
let session_meta_line = SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
+3
View File
@@ -33,6 +33,7 @@ async fn extract_metadata_from_rollout_uses_session_meta() {
.join(format!("rollout-2026-01-27T12-34-56-{uuid}.jsonl"));
let session_meta = SessionMeta {
session_id: id.into(),
id,
forked_from_id: None,
parent_thread_id: None,
@@ -88,6 +89,7 @@ async fn extract_metadata_from_rollout_returns_latest_memory_mode() {
.join(format!("rollout-2026-01-27T12-34-56-{uuid}.jsonl"));
let session_meta = SessionMeta {
session_id: id.into(),
id,
forked_from_id: None,
parent_thread_id: None,
@@ -355,6 +357,7 @@ fn write_rollout_in_sessions_with_cwd(
std::fs::create_dir_all(sessions_dir.as_path()).expect("create sessions dir");
let path = sessions_dir.join(format!("rollout-{filename_ts}-{thread_uuid}.jsonl"));
let session_meta = SessionMeta {
session_id: id.into(),
id,
forked_from_id: None,
parent_thread_id: None,
+17 -5
View File
@@ -10,6 +10,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use chrono::SecondsFormat;
use codex_protocol::SessionId;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::models::BaseInstructions;
@@ -81,10 +82,11 @@ pub struct RolloutRecorder {
#[derive(Clone)]
pub enum RolloutRecorderParams {
Create {
session_id: SessionId,
conversation_id: ThreadId,
forked_from_id: Option<ThreadId>,
parent_thread_id: Option<ThreadId>,
source: SessionSource,
source: Box<SessionSource>,
thread_source: Option<ThreadSource>,
base_instructions: BaseInstructions,
dynamic_tools: Vec<DynamicToolSpec>,
@@ -167,10 +169,11 @@ impl RolloutRecorderParams {
dynamic_tools: Vec<DynamicToolSpec>,
) -> Self {
Self::Create {
session_id: conversation_id.into(),
conversation_id,
forked_from_id,
parent_thread_id,
source,
source: Box::new(source),
thread_source,
base_instructions,
dynamic_tools,
@@ -178,6 +181,13 @@ impl RolloutRecorderParams {
}
}
pub fn with_session_id(mut self, session_id: SessionId) -> Self {
if let Self::Create { session_id: id, .. } = &mut self {
*id = session_id;
}
self
}
pub fn with_multi_agent_version(
mut self,
multi_agent_version: Option<MultiAgentVersion>,
@@ -696,6 +706,7 @@ impl RolloutRecorder {
) -> std::io::Result<Self> {
let (file, deferred_log_file_info, rollout_path, meta) = match params {
RolloutRecorderParams::Create {
session_id,
conversation_id,
forked_from_id,
parent_thread_id,
@@ -707,7 +718,7 @@ impl RolloutRecorder {
} => {
let log_file_info = precompute_log_file_info(config, conversation_id)?;
let path = log_file_info.path.clone();
let session_id = log_file_info.conversation_id;
let thread_id = log_file_info.conversation_id;
let started_at = log_file_info.timestamp;
let timestamp_format: &[FormatItem] = format_description!(
@@ -719,7 +730,8 @@ impl RolloutRecorder {
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let session_meta = SessionMeta {
id: session_id,
session_id,
id: thread_id,
forked_from_id,
parent_thread_id,
timestamp,
@@ -729,7 +741,7 @@ impl RolloutRecorder {
agent_nickname: source.get_nickname(),
agent_role: source.get_agent_role(),
agent_path: source.get_agent_path().map(Into::into),
source,
source: *source,
thread_source,
model_provider: Some(config.model_provider_id().to_string()),
base_instructions: Some(base_instructions),
+20 -7
View File
@@ -3,6 +3,7 @@
use super::*;
use crate::config::RolloutConfig;
use chrono::TimeZone;
use codex_protocol::SessionId;
use codex_protocol::ThreadId;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentMessageEvent;
@@ -45,6 +46,7 @@ fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result<Path
"timestamp": ts,
"type": "session_meta",
"payload": {
"session_id": uuid,
"id": uuid,
"timestamp": ts,
"cwd": ".",
@@ -83,6 +85,7 @@ async fn state_db_init_backfills_before_returning() -> anyhow::Result<()> {
let session_meta_line = SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
@@ -145,7 +148,7 @@ async fn state_db_init_backfills_before_returning() -> anyhow::Result<()> {
}
#[tokio::test]
async fn load_rollout_items_skips_legacy_ghost_snapshot_lines() -> std::io::Result<()> {
async fn load_rollout_items_defaults_legacy_session_id() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let rollout_path = home.path().join("rollout.jsonl");
let mut file = File::create(&rollout_path)?;
@@ -210,7 +213,10 @@ async fn load_rollout_items_skips_legacy_ghost_snapshot_lines() -> std::io::Resu
assert_eq!(loaded_thread_id, Some(thread_id));
assert_eq!(parse_errors, 0);
assert_eq!(items.len(), 2);
assert!(matches!(items[0], RolloutItem::SessionMeta(_)));
let RolloutItem::SessionMeta(session_meta) = &items[0] else {
panic!("expected session metadata");
};
assert_eq!(session_meta.meta.session_id, SessionId::from(thread_id));
assert!(matches!(
items[1],
RolloutItem::ResponseItem(ResponseItem::Message { .. })
@@ -234,6 +240,7 @@ async fn load_rollout_items_preserves_legacy_guardian_assessment_lines() -> std:
"timestamp": ts,
"type": "session_meta",
"payload": {
"session_id": thread_id,
"id": thread_id,
"timestamp": ts,
"cwd": ".",
@@ -297,6 +304,7 @@ async fn load_rollout_items_filters_legacy_ghost_snapshots_from_compaction_histo
"timestamp": ts,
"type": "session_meta",
"payload": {
"session_id": thread_id,
"id": thread_id,
"timestamp": ts,
"cwd": ".",
@@ -365,6 +373,7 @@ async fn load_rollout_items_filters_legacy_ghost_snapshots_from_compaction_histo
async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let session_id = SessionId::default();
let thread_id = ThreadId::new();
let recorder = RolloutRecorder::new(
&config,
@@ -376,7 +385,8 @@ async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result<
/*thread_source*/ None,
BaseInstructions::default(),
Vec::new(),
),
)
.with_session_id(session_id),
)
.await?;
@@ -421,10 +431,12 @@ async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result<
assert!(rollout_path.exists(), "rollout file should be materialized");
let text = std::fs::read_to_string(&rollout_path)?;
assert!(
text.contains("\"type\":\"session_meta\""),
"expected session metadata in rollout"
);
let first_line = text.lines().next().expect("session metadata line");
let session_meta: RolloutLine = serde_json::from_str(first_line)?;
let RolloutItem::SessionMeta(session_meta) = session_meta.item else {
panic!("expected session metadata in rollout");
};
assert_eq!(session_meta.meta.session_id, session_id);
let buffered_idx = text
.find("buffered-event")
.expect("buffered event in rollout");
@@ -732,6 +744,7 @@ async fn list_threads_state_db_only_skips_jsonl_repair_scan() -> std::io::Result
"timestamp": ts,
"type": "session_meta",
"payload": {
"session_id": uuid,
"id": uuid,
"timestamp": ts,
"cwd": home.path().display().to_string(),
@@ -25,6 +25,7 @@ fn write_rollout_with_metadata(path: &Path, thread_id: ThreadId) -> std::io::Res
timestamp: timestamp.clone(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
+1
View File
@@ -158,6 +158,7 @@ fn write_rollout_with_user_message(
timestamp: "2026-06-01T14:26:25Z".to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
+7 -1
View File
@@ -297,6 +297,7 @@ fn write_session_file_with_provider(
let mut file = File::create(file_path)?;
let mut payload = serde_json::json!({
"session_id": uuid,
"id": uuid,
"timestamp": ts_str,
"cwd": ".",
@@ -370,6 +371,7 @@ fn write_goal_started_session_file(
"timestamp": ts_str,
"type": "session_meta",
"payload": {
"session_id": uuid,
"id": uuid,
"timestamp": ts_str,
"cwd": ".",
@@ -451,6 +453,7 @@ fn write_session_file_with_delayed_user_event(
Uuid::from_u128(100 + i as u128)
};
let payload = serde_json::json!({
"session_id": uuid,
"id": id,
"timestamp": ts_str,
"cwd": ".",
@@ -483,8 +486,9 @@ fn write_session_file_with_meta_payload(
root: &Path,
ts_str: &str,
uuid: Uuid,
payload: serde_json::Value,
mut payload: serde_json::Value,
) -> std::io::Result<()> {
payload["session_id"] = serde_json::json!(uuid);
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let dt = PrimitiveDateTime::parse(ts_str, format)
@@ -1088,6 +1092,7 @@ async fn test_get_thread_contents() {
"timestamp": ts,
"type": "session_meta",
"payload": {
"session_id": uuid,
"id": uuid,
"timestamp": ts,
"cwd": ".",
@@ -1267,6 +1272,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
timestamp: ts.to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
session_id: conversation_id.into(),
id: conversation_id,
forked_from_id: None,
parent_thread_id: None,
+2
View File
@@ -321,6 +321,7 @@ mod tests {
&mut metadata,
&RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: Some(
ThreadId::from_string(&Uuid::now_v7().to_string()).expect("thread id"),
@@ -513,6 +514,7 @@ mod tests {
&mut metadata,
&RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
+2
View File
@@ -1957,6 +1957,7 @@ mod tests {
);
let items = vec![RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
@@ -2018,6 +2019,7 @@ mod tests {
);
let items = vec![RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
forked_from_id: None,
parent_thread_id: None,
+2
View File
@@ -110,6 +110,7 @@ mod tests {
] {
store
.create_thread(CreateThreadParams {
session_id: thread_id.into(),
thread_id,
extra_config: None,
forked_from_id: None,
@@ -231,6 +232,7 @@ impl InMemoryThreadStore {
let mut state = self.state.lock().await;
state.calls.create_thread += 1;
let session_meta = SessionMeta {
session_id: params.session_id,
id: params.thread_id,
forked_from_id: params.forked_from_id,
parent_thread_id: params.parent_thread_id,
@@ -36,6 +36,7 @@ pub(super) async fn create_thread(
params.base_instructions,
params.dynamic_tools,
)
.with_session_id(params.session_id)
.with_multi_agent_version(params.multi_agent_version),
)
.await
+1
View File
@@ -1123,6 +1123,7 @@ mod tests {
fn create_thread_params(thread_id: ThreadId) -> CreateThreadParams {
CreateThreadParams {
session_id: thread_id.into(),
thread_id,
extra_config: None,
forked_from_id: None,
@@ -821,6 +821,7 @@ mod tests {
"timestamp": "2025-01-03T12:00:00Z",
"type": "session_meta",
"payload": {
"session_id": uuid,
"id": uuid,
"timestamp": "2025-01-03T12:00:00Z",
"cwd": rollout_cwd,
@@ -926,6 +927,7 @@ mod tests {
"timestamp": "2025-01-03T12-00-00",
"type": "session_meta",
"payload": {
"session_id": uuid,
"id": uuid,
"timestamp": "2025-01-03T12-00-00",
"cwd": home.path(),
@@ -1091,6 +1093,7 @@ mod tests {
"timestamp": "2025-01-03T12:00:00Z",
"type": "session_meta",
"payload": {
"session_id": uuid,
"id": uuid,
"timestamp": "2025-01-03T12:00:00Z",
"cwd": home.path(),
@@ -77,6 +77,7 @@ pub(super) fn write_session_file_with_fork(
"timestamp": ts,
"type": "session_meta",
"payload": {
"session_id": uuid,
"id": uuid,
"forked_from_id": forked_from_id,
"timestamp": ts,
@@ -579,6 +579,7 @@ mod tests {
fn session_meta(thread_id: ThreadId) -> SessionMetaLine {
SessionMetaLine {
meta: SessionMeta {
session_id: thread_id.into(),
id: thread_id,
timestamp: "2025-01-03T12:00:00Z".to_string(),
source: SessionSource::Exec,
+3
View File
@@ -2,6 +2,7 @@ use std::path::PathBuf;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::SessionId;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::models::BaseInstructions;
@@ -63,6 +64,8 @@ pub struct ExtraConfig {}
/// Parameters required to create a persisted thread.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateThreadParams {
/// Session id shared by the root thread and all of its subagents.
pub session_id: SessionId,
/// Thread id generated by Codex before opening persistence.
pub thread_id: ThreadId,
/// Optional extra configuration fields for the thread.
+1
View File
@@ -2041,6 +2041,7 @@ mod tests {
std::fs::create_dir_all(parent)?;
let session_meta = codex_protocol::protocol::SessionMeta {
session_id: thread_id.into(),
id: thread_id,
timestamp: meta_rfc3339.to_string(),
cwd: cwd.to_path_buf(),