diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index d22a023a3..c513cbb55 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -3446,28 +3446,21 @@ impl Session { .build_initial_context_with_world_state(turn_context, world_state.as_ref()) .await; let turn_context_item = turn_context.to_turn_context_item(); - let replacement_history = context_items; - { - let mut state = self.state.lock().await; - state.replace_history(replacement_history.clone(), Some(turn_context_item.clone())); - state.history.set_world_state_baseline(world_state); - }; - self.persist_rollout_items(&[ - RolloutItem::Compacted(CompactedItem { + self.replace_compacted_history( + turn_context, + context_items, + Some(turn_context_item), + Some(world_state), + CompactedItem { message: String::new(), - replacement_history: Some(replacement_history), + replacement_history: None, window_number: Some(window_number), first_window_id: Some(window_ids.first_window_id.to_string()), previous_window_id: window_ids.previous_window_id.map(|id| id.to_string()), window_id: Some(window_ids.window_id.to_string()), - }), - RolloutItem::TurnContext(turn_context_item), - ]) + }, + ) .await; - { - let mut state = self.state.lock().await; - state.queue_pending_session_start_source(codex_hooks::SessionStartSource::Compact); - } self.recompute_token_usage(turn_context).await; window_number } diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 2da1b9a81..48e48b6f3 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -2686,6 +2686,56 @@ async fn record_initial_history_reconstructs_forked_transcript() { assert_eq!(expected, history.raw_items()); } +#[tokio::test] +async fn start_new_context_window_assigns_and_persists_item_ids() { + let (mut session, turn_context, _rx) = make_session_and_context_with_auth_and_config_and_rx( + CodexAuth::from_api_key("Test API Key"), + Vec::new(), + |config| { + let _ = config.features.enable(Feature::ItemIds); + }, + ) + .await; + let rollout_path = + attach_thread_persistence(Arc::get_mut(&mut session).expect("unique session")).await; + let world_state = Arc::new( + build_world_state_from_turn_context(session.as_ref(), turn_context.as_ref()).await, + ); + + session + .start_new_context_window(turn_context.as_ref(), world_state) + .await; + + let live_history = session.clone_history().await; + assert!(!live_history.raw_items().is_empty()); + assert!( + live_history + .raw_items() + .iter() + .all(|item| item.id().is_some()) + ); + + session.flush_rollout().await.expect("rollout should flush"); + let InitialHistory::Resumed(resumed) = RolloutRecorder::get_rollout_history(&rollout_path) + .await + .expect("read rollout history") + else { + panic!("expected resumed rollout history"); + }; + let persisted_replacement_history = resumed.history.iter().rev().find_map(|item| match item { + RolloutItem::Compacted(compacted) => compacted.replacement_history.as_ref(), + RolloutItem::SessionMeta(_) + | RolloutItem::ResponseItem(_) + | RolloutItem::InterAgentCommunication(_) + | RolloutItem::TurnContext(_) + | RolloutItem::EventMsg(_) => None, + }); + assert_eq!( + persisted_replacement_history.map(Vec::as_slice), + Some(live_history.raw_items()) + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn session_configured_reports_permission_profile_for_external_sandbox() -> anyhow::Result<()> {