From b4f0f3eff1303ea445a218afb039e0079df0afce Mon Sep 17 00:00:00 2001 From: jif Date: Wed, 24 Jun 2026 15:43:10 +0100 Subject: [PATCH] Persist agent messages as response items (#29829) ## Why Inter-agent messages are recorded in live history as `ResponseItem::AgentMessage`, but rollouts stored `InterAgentCommunication` and rebuilt the response item during resume. This made the rollout differ from the actual Responses history. ## What changed - store the prepared `agent_message` response item directly - keep `trigger_turn` in a small local metadata record for fork truncation - keep reading older `inter_agent_communication` rollout items --- .../src/protocol/thread_history.rs | 1 + codex-rs/core/src/agent/control/spawn.rs | 3 +- codex-rs/core/src/agent/control_tests.rs | 1 + codex-rs/core/src/session/mod.rs | 11 +++-- .../src/session/rollout_reconstruction.rs | 2 + codex-rs/core/src/session/tests.rs | 34 +++++++++++++--- .../core/src/thread_rollout_truncation.rs | 15 ++++++- .../src/thread_rollout_truncation_tests.rs | 40 +++++++++++++++++++ codex-rs/memories/write/src/phase1.rs | 22 ++++++++++ codex-rs/protocol/src/protocol.rs | 8 +++- codex-rs/rollout/src/list.rs | 4 +- codex-rs/rollout/src/metadata.rs | 2 + codex-rs/rollout/src/persistence_metrics.rs | 3 ++ codex-rs/rollout/src/policy.rs | 7 ++-- codex-rs/rollout/src/recorder.rs | 1 + codex-rs/rollout/src/search.rs | 1 + codex-rs/state/src/extract.rs | 4 +- codex-rs/state/src/runtime/threads.rs | 1 + .../thread-store/src/thread_metadata_sync.rs | 1 + 19 files changed, 144 insertions(+), 17 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index f9ebc93bf..2df9a3dc5 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -386,6 +386,7 @@ impl ThreadHistoryBuilder { RolloutItem::Compacted(payload) => self.handle_compacted(payload), RolloutItem::ResponseItem(item) => self.handle_response_item(item), RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {} } diff --git a/codex-rs/core/src/agent/control/spawn.rs b/codex-rs/core/src/agent/control/spawn.rs index e208b418d..cdb3d352a 100644 --- a/codex-rs/core/src/agent/control/spawn.rs +++ b/codex-rs/core/src/agent/control/spawn.rs @@ -58,7 +58,8 @@ fn keep_forked_rollout_item(item: &RolloutItem, preserve_reference_context_item: | ResponseItem::ContextCompaction { .. } | ResponseItem::Other, ) => false, - RolloutItem::InterAgentCommunication(_) => false, + RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } => false, // Full-history forks preserve the cached prompt prefix and can keep diffing // from the parent's durable baseline. Truncated forks drop part of that prompt, // so they must rebuild context on their first child turn. diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index 225fd4ba9..05b824253 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -163,6 +163,7 @@ async fn persisted_originator(thread: &CodexThread) -> String { RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.originator.clone()), RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::EventMsg(_) | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) => None, diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index c7af52ced..00d2fdca3 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -2841,7 +2841,7 @@ impl Session { std::slice::from_ref(&response_item), ); let items = items.as_ref(); - communication.id = items.first().and_then(ResponseItem::id).map(str::to_string); + let response_item = items[0].clone(); { let mut state = self.state.lock().await; state.record_items( @@ -2849,8 +2849,13 @@ impl Session { turn_context.model_info.truncation_policy.into(), ); } - self.persist_rollout_items(&[RolloutItem::InterAgentCommunication(communication)]) - .await; + self.persist_rollout_items(&[ + RolloutItem::InterAgentCommunicationMetadata { + trigger_turn: communication.trigger_turn, + }, + RolloutItem::ResponseItem(response_item), + ]) + .await; self.send_raw_response_items(turn_context, items).await; } diff --git a/codex-rs/core/src/session/rollout_reconstruction.rs b/codex-rs/core/src/session/rollout_reconstruction.rs index b3fbeb103..aaf0b8434 100644 --- a/codex-rs/core/src/session/rollout_reconstruction.rs +++ b/codex-rs/core/src/session/rollout_reconstruction.rs @@ -264,6 +264,7 @@ impl Session { active_segment.get_or_insert_with(ActiveReplaySegment::default); active_segment.counts_as_user_turn = true; } + RolloutItem::InterAgentCommunicationMetadata { .. } => {} RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) => {} } @@ -320,6 +321,7 @@ impl Session { turn_context.model_info.truncation_policy.into(), ); } + RolloutItem::InterAgentCommunicationMetadata { .. } => {} RolloutItem::Compacted(compacted) => { if let Some(replacement_history) = &compacted.replacement_history { // This should actually never happen, because the reverse loop above (to build rollout_suffix) diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index cbc2128f5..5b2e1832c 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -1766,6 +1766,29 @@ async fn record_inter_agent_communication_sets_turn_id_in_rollout_and_resume() { else { panic!("expected resumed rollout history"); }; + let persisted_items = resumed + .history + .iter() + .filter(|item| { + matches!( + item, + RolloutItem::ResponseItem(_) + | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } + ) + }) + .cloned() + .collect::>(); + let expected_persisted_items = vec![ + RolloutItem::InterAgentCommunicationMetadata { + trigger_turn: false, + }, + RolloutItem::ResponseItem(expected_item.clone()), + ]; + assert_eq!( + serde_json::to_value(persisted_items).unwrap(), + serde_json::to_value(expected_persisted_items).unwrap() + ); let (resumed_session, _resumed_turn_context) = make_session_and_context().await; resumed_session @@ -1818,14 +1841,11 @@ async fn record_inter_agent_communication_preserves_item_id_in_rollout_and_resum else { panic!("expected resumed rollout history"); }; - let persisted_communication = resumed.history.iter().find_map(|item| match item { - RolloutItem::InterAgentCommunication(communication) => Some(communication), + let persisted_item_id = resumed.history.iter().find_map(|item| match item { + RolloutItem::ResponseItem(item @ ResponseItem::AgentMessage { .. }) => item.id(), _ => None, }); - assert_eq!( - persisted_communication.and_then(|communication| communication.id.as_deref()), - Some(live_item_id.as_str()) - ); + assert_eq!(persisted_item_id, Some(live_item_id.as_str())); let (resumed_session, _resumed_turn_context, _rx) = make_session_and_context_with_auth_and_config_and_rx( @@ -2727,6 +2747,7 @@ async fn start_new_context_window_assigns_and_persists_item_ids() { RolloutItem::SessionMeta(_) | RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::TurnContext(_) | RolloutItem::EventMsg(_) => None, }); @@ -2783,6 +2804,7 @@ async fn record_initial_history_assigns_and_persists_id_for_forked_response_item RolloutItem::ResponseItem(response_item) => response_item.id(), RolloutItem::SessionMeta(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::EventMsg(_) => None, diff --git a/codex-rs/core/src/thread_rollout_truncation.rs b/codex-rs/core/src/thread_rollout_truncation.rs index 407586715..25263b14b 100644 --- a/codex-rs/core/src/thread_rollout_truncation.rs +++ b/codex-rs/core/src/thread_rollout_truncation.rs @@ -72,7 +72,14 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec { - if is_user_turn_boundary(item) { + let has_delivery_metadata = matches!(item, ResponseItem::AgentMessage { .. }) + && idx.checked_sub(1).is_some_and(|previous_idx| { + matches!( + items.get(previous_idx), + Some(RolloutItem::InterAgentCommunicationMetadata { .. }) + ) + }); + if is_user_turn_boundary(item) && !has_delivery_metadata { rollback_turn_positions.push(idx); } if is_real_user_message_boundary(item) || is_trigger_turn_boundary(item) { @@ -85,6 +92,12 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec { + rollback_turn_positions.push(idx); + if *trigger_turn { + fork_turn_positions.push(idx); + } + } RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => { let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX); if num_turns == 0 { diff --git a/codex-rs/core/src/thread_rollout_truncation_tests.rs b/codex-rs/core/src/thread_rollout_truncation_tests.rs index 2e6d8fde7..3c79a8ac2 100644 --- a/codex-rs/core/src/thread_rollout_truncation_tests.rs +++ b/codex-rs/core/src/thread_rollout_truncation_tests.rs @@ -241,6 +241,46 @@ fn fork_turn_positions_use_inter_agent_delivery_metadata() { assert_eq!(fork_turn_positions_in_rollout(&rollout), vec![0, 3, 5]); } +#[test] +fn fork_turn_positions_use_canonical_agent_messages_and_delivery_metadata() { + let queued = InterAgentCommunication::new( + AgentPath::root(), + AgentPath::try_from("/root/worker").expect("agent path"), + Vec::new(), + "queued during user turn".to_string(), + /*trigger_turn*/ false, + ); + let triggered = InterAgentCommunication::new( + AgentPath::root(), + AgentPath::try_from("/root/worker").expect("agent path"), + Vec::new(), + "follow-up task".to_string(), + /*trigger_turn*/ true, + ); + let mut rollout = vec![ + RolloutItem::ResponseItem(user_msg("user task")), + RolloutItem::InterAgentCommunicationMetadata { + trigger_turn: false, + }, + RolloutItem::ResponseItem(queued.to_model_input_item()), + RolloutItem::ResponseItem(assistant_msg("first answer")), + RolloutItem::InterAgentCommunicationMetadata { trigger_turn: true }, + RolloutItem::ResponseItem(triggered.to_model_input_item()), + RolloutItem::ResponseItem(assistant_msg("second answer")), + RolloutItem::ResponseItem(user_msg("next user task")), + ]; + + assert_eq!(fork_turn_positions_in_rollout(&rollout), vec![0, 4, 7]); + + rollout.insert( + 7, + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent { + num_turns: 1, + })), + ); + assert_eq!(fork_turn_positions_in_rollout(&rollout), vec![0, 8]); +} + #[test] fn truncates_rollout_to_last_n_fork_turns_drops_startup_prefix_even_when_under_limit() { let rollout = vec![ diff --git a/codex-rs/memories/write/src/phase1.rs b/codex-rs/memories/write/src/phase1.rs index 5b5ae8166..594f7e7c5 100644 --- a/codex-rs/memories/write/src/phase1.rs +++ b/codex-rs/memories/write/src/phase1.rs @@ -412,6 +412,7 @@ mod job { Some(communication.to_model_input_item()) } RolloutItem::SessionMeta(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::EventMsg(_) => None, @@ -790,6 +791,27 @@ mod tests { assert_eq!(parsed, expected); } + #[test] + fn serializes_agent_message_response_items_for_memory() { + let communication = InterAgentCommunication::new( + AgentPath::root(), + AgentPath::root().join("worker").expect("agent path"), + Vec::new(), + "delegated task".to_string(), + /*trigger_turn*/ true, + ); + let response_item = communication.to_model_input_item(); + + let serialized = job::serialize_filtered_rollout_response_items(&[ + RolloutItem::InterAgentCommunicationMetadata { trigger_turn: true }, + RolloutItem::ResponseItem(response_item.clone()), + ]) + .expect("serialize"); + let parsed: Vec = serde_json::from_str(&serialized).expect("parse"); + + assert_eq!(parsed, vec![response_item]); + } + #[test] fn count_outcomes_sums_token_usage_across_all_jobs() { let counts = aggregate_stats(vec![ diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 444e44075..138b36f10 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2599,6 +2599,7 @@ impl InitialHistory { RolloutItem::SessionMeta(_) | RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::EventMsg(_) => None, }) @@ -2933,6 +2934,7 @@ fn multi_agent_version_from_items( RolloutItem::SessionMeta(_) | RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::EventMsg(_) => None, }) @@ -3080,8 +3082,12 @@ impl<'de> Deserialize<'de> for SessionMetaLine { pub enum RolloutItem { SessionMeta(SessionMetaLine), ResponseItem(ResponseItem), - /// Durable delivery metadata reconstructed as a model-visible `agent_message`. + /// Legacy delivery item reconstructed as a model-visible `agent_message`. InterAgentCommunication(InterAgentCommunication), + /// Local delivery metadata that is not part of the Responses API item. + InterAgentCommunicationMetadata { + trigger_turn: bool, + }, Compacted(CompactedItem), TurnContext(TurnContextItem), EventMsg(EventMsg), diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index f5013e7aa..cdc2ca17c 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -1151,6 +1151,7 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result {} RolloutItem::TurnContext(_) => { // Not included in `head`; skip. } @@ -1213,7 +1214,8 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result {} } diff --git a/codex-rs/rollout/src/metadata.rs b/codex-rs/rollout/src/metadata.rs index dcb38b6d2..5199312e0 100644 --- a/codex-rs/rollout/src/metadata.rs +++ b/codex-rs/rollout/src/metadata.rs @@ -69,6 +69,7 @@ pub fn builder_from_items( RolloutItem::SessionMeta(meta_line) => Some(meta_line), RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::EventMsg(_) => None, @@ -123,6 +124,7 @@ pub async fn extract_metadata_from_rollout( RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(), RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::EventMsg(_) => None, diff --git a/codex-rs/rollout/src/persistence_metrics.rs b/codex-rs/rollout/src/persistence_metrics.rs index c851ea624..104d99fcf 100644 --- a/codex-rs/rollout/src/persistence_metrics.rs +++ b/codex-rs/rollout/src/persistence_metrics.rs @@ -227,6 +227,9 @@ fn rollout_item_type(item: &RolloutItem) -> String { RolloutItem::SessionMeta(_) => "session_meta".to_string(), RolloutItem::ResponseItem(item) => response_item_type(item).to_string(), RolloutItem::InterAgentCommunication(_) => "inter_agent_communication".to_string(), + RolloutItem::InterAgentCommunicationMetadata { .. } => { + "inter_agent_communication_metadata".to_string() + } RolloutItem::Compacted(_) => "compacted".to_string(), RolloutItem::TurnContext(_) => "turn_context".to_string(), RolloutItem::EventMsg(EventMsg::ItemCompleted(event)) => { diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 4db26576b..ec8db3f6f 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -6,7 +6,8 @@ use codex_protocol::models::ResponseItem; pub fn is_persisted_rollout_item(item: &RolloutItem) -> bool { match item { RolloutItem::ResponseItem(item) => should_persist_response_item(item), - RolloutItem::InterAgentCommunication(_) => true, + RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } => true, RolloutItem::EventMsg(ev) => should_persist_event_msg(ev), // Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns). RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => { @@ -55,7 +56,8 @@ pub fn should_persist_response_item(item: &ResponseItem) -> bool { pub fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool { match item { ResponseItem::Message { role, .. } => role != "developer", - ResponseItem::LocalShellCall { .. } + ResponseItem::AgentMessage { .. } + | ResponseItem::LocalShellCall { .. } | ResponseItem::FunctionCall { .. } | ResponseItem::ToolSearchCall { .. } | ResponseItem::FunctionCallOutput { .. } @@ -64,7 +66,6 @@ pub fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool { | ResponseItem::CustomToolCallOutput { .. } | ResponseItem::WebSearchCall { .. } => true, ResponseItem::AdditionalTools { .. } - | ResponseItem::AgentMessage { .. } | ResponseItem::Reasoning { .. } | ResponseItem::ImageGenerationCall { .. } | ResponseItem::Compaction { .. } diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index dad7f8bc3..90edb2c49 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -1856,6 +1856,7 @@ async fn resume_candidate_matches_cwd( RolloutItem::SessionMeta(_) | RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::EventMsg(_) => None, }) diff --git a/codex-rs/rollout/src/search.rs b/codex-rs/rollout/src/search.rs index 799e92717..0c932ae33 100644 --- a/codex-rs/rollout/src/search.rs +++ b/codex-rs/rollout/src/search.rs @@ -284,6 +284,7 @@ fn conversation_text_from_item(item: &RolloutItem) -> Option { | RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) => None, } } diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index 5b8388bf8..5291b679e 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -22,7 +22,8 @@ pub fn apply_rollout_item( RolloutItem::TurnContext(turn_ctx) => apply_turn_context(metadata, turn_ctx), RolloutItem::EventMsg(event) => apply_event_msg(metadata, event), RolloutItem::ResponseItem(item) => apply_response_item(metadata, item), - RolloutItem::InterAgentCommunication(_) => {} + RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } => {} RolloutItem::Compacted(_) => {} } if metadata.model_provider.is_empty() { @@ -40,6 +41,7 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool { RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) => false, } } diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index e4181d47d..39233789c 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -1163,6 +1163,7 @@ pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option { RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(), RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::EventMsg(_) => None, diff --git a/codex-rs/thread-store/src/thread_metadata_sync.rs b/codex-rs/thread-store/src/thread_metadata_sync.rs index 852f67912..08ad72696 100644 --- a/codex-rs/thread-store/src/thread_metadata_sync.rs +++ b/codex-rs/thread-store/src/thread_metadata_sync.rs @@ -285,6 +285,7 @@ impl ThreadMetadataSync { | RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) + | RolloutItem::InterAgentCommunicationMetadata { .. } | RolloutItem::Compacted(_) => {} } }