Persist agent messages as response items (#29829)

## Why

Inter-agent messages are recorded in live history as
`ResponseItem::AgentMessage`, but rollouts stored
`InterAgentCommunication` and rebuilt the response item during resume.
This made the rollout differ from the actual Responses history.

## What changed

- store the prepared `agent_message` response item directly
- keep `trigger_turn` in a small local metadata record for fork
truncation
- keep reading older `inter_agent_communication` rollout items
This commit is contained in:
jif
2026-06-24 15:43:10 +01:00
committed by GitHub
Unverified
parent f959e7fc98
commit b4f0f3eff1
19 changed files with 144 additions and 17 deletions
@@ -386,6 +386,7 @@ impl ThreadHistoryBuilder {
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
RolloutItem::ResponseItem(item) => self.handle_response_item(item),
RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_) => {}
}
+2 -1
View File
@@ -58,7 +58,8 @@ fn keep_forked_rollout_item(item: &RolloutItem, preserve_reference_context_item:
| ResponseItem::ContextCompaction { .. }
| ResponseItem::Other,
) => false,
RolloutItem::InterAgentCommunication(_) => false,
RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. } => false,
// Full-history forks preserve the cached prompt prefix and can keep diffing
// from the parent's durable baseline. Truncated forks drop part of that prompt,
// so they must rebuild context on their first child turn.
+1
View File
@@ -163,6 +163,7 @@ async fn persisted_originator(thread: &CodexThread) -> String {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.originator.clone()),
RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::EventMsg(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_) => None,
+8 -3
View File
@@ -2841,7 +2841,7 @@ impl Session {
std::slice::from_ref(&response_item),
);
let items = items.as_ref();
communication.id = items.first().and_then(ResponseItem::id).map(str::to_string);
let response_item = items[0].clone();
{
let mut state = self.state.lock().await;
state.record_items(
@@ -2849,8 +2849,13 @@ impl Session {
turn_context.model_info.truncation_policy.into(),
);
}
self.persist_rollout_items(&[RolloutItem::InterAgentCommunication(communication)])
.await;
self.persist_rollout_items(&[
RolloutItem::InterAgentCommunicationMetadata {
trigger_turn: communication.trigger_turn,
},
RolloutItem::ResponseItem(response_item),
])
.await;
self.send_raw_response_items(turn_context, items).await;
}
@@ -264,6 +264,7 @@ impl Session {
active_segment.get_or_insert_with(ActiveReplaySegment::default);
active_segment.counts_as_user_turn = true;
}
RolloutItem::InterAgentCommunicationMetadata { .. } => {}
RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) => {}
}
@@ -320,6 +321,7 @@ impl Session {
turn_context.model_info.truncation_policy.into(),
);
}
RolloutItem::InterAgentCommunicationMetadata { .. } => {}
RolloutItem::Compacted(compacted) => {
if let Some(replacement_history) = &compacted.replacement_history {
// This should actually never happen, because the reverse loop above (to build rollout_suffix)
+28 -6
View File
@@ -1766,6 +1766,29 @@ async fn record_inter_agent_communication_sets_turn_id_in_rollout_and_resume() {
else {
panic!("expected resumed rollout history");
};
let persisted_items = resumed
.history
.iter()
.filter(|item| {
matches!(
item,
RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
)
})
.cloned()
.collect::<Vec<_>>();
let expected_persisted_items = vec![
RolloutItem::InterAgentCommunicationMetadata {
trigger_turn: false,
},
RolloutItem::ResponseItem(expected_item.clone()),
];
assert_eq!(
serde_json::to_value(persisted_items).unwrap(),
serde_json::to_value(expected_persisted_items).unwrap()
);
let (resumed_session, _resumed_turn_context) = make_session_and_context().await;
resumed_session
@@ -1818,14 +1841,11 @@ async fn record_inter_agent_communication_preserves_item_id_in_rollout_and_resum
else {
panic!("expected resumed rollout history");
};
let persisted_communication = resumed.history.iter().find_map(|item| match item {
RolloutItem::InterAgentCommunication(communication) => Some(communication),
let persisted_item_id = resumed.history.iter().find_map(|item| match item {
RolloutItem::ResponseItem(item @ ResponseItem::AgentMessage { .. }) => item.id(),
_ => None,
});
assert_eq!(
persisted_communication.and_then(|communication| communication.id.as_deref()),
Some(live_item_id.as_str())
);
assert_eq!(persisted_item_id, Some(live_item_id.as_str()));
let (resumed_session, _resumed_turn_context, _rx) =
make_session_and_context_with_auth_and_config_and_rx(
@@ -2727,6 +2747,7 @@ async fn start_new_context_window_assigns_and_persists_item_ids() {
RolloutItem::SessionMeta(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
});
@@ -2783,6 +2804,7 @@ async fn record_initial_history_assigns_and_persists_id_for_forked_response_item
RolloutItem::ResponseItem(response_item) => response_item.id(),
RolloutItem::SessionMeta(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
+14 -1
View File
@@ -72,7 +72,14 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec<usize
for (idx, item) in items.iter().enumerate() {
match item {
RolloutItem::ResponseItem(item) => {
if is_user_turn_boundary(item) {
let has_delivery_metadata = matches!(item, ResponseItem::AgentMessage { .. })
&& idx.checked_sub(1).is_some_and(|previous_idx| {
matches!(
items.get(previous_idx),
Some(RolloutItem::InterAgentCommunicationMetadata { .. })
)
});
if is_user_turn_boundary(item) && !has_delivery_metadata {
rollback_turn_positions.push(idx);
}
if is_real_user_message_boundary(item) || is_trigger_turn_boundary(item) {
@@ -85,6 +92,12 @@ pub(crate) fn fork_turn_positions_in_rollout(items: &[RolloutItem]) -> Vec<usize
fork_turn_positions.push(idx);
}
}
RolloutItem::InterAgentCommunicationMetadata { trigger_turn } => {
rollback_turn_positions.push(idx);
if *trigger_turn {
fork_turn_positions.push(idx);
}
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
let num_turns = usize::try_from(rollback.num_turns).unwrap_or(usize::MAX);
if num_turns == 0 {
@@ -241,6 +241,46 @@ fn fork_turn_positions_use_inter_agent_delivery_metadata() {
assert_eq!(fork_turn_positions_in_rollout(&rollout), vec![0, 3, 5]);
}
#[test]
fn fork_turn_positions_use_canonical_agent_messages_and_delivery_metadata() {
let queued = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"queued during user turn".to_string(),
/*trigger_turn*/ false,
);
let triggered = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"follow-up task".to_string(),
/*trigger_turn*/ true,
);
let mut rollout = vec![
RolloutItem::ResponseItem(user_msg("user task")),
RolloutItem::InterAgentCommunicationMetadata {
trigger_turn: false,
},
RolloutItem::ResponseItem(queued.to_model_input_item()),
RolloutItem::ResponseItem(assistant_msg("first answer")),
RolloutItem::InterAgentCommunicationMetadata { trigger_turn: true },
RolloutItem::ResponseItem(triggered.to_model_input_item()),
RolloutItem::ResponseItem(assistant_msg("second answer")),
RolloutItem::ResponseItem(user_msg("next user task")),
];
assert_eq!(fork_turn_positions_in_rollout(&rollout), vec![0, 4, 7]);
rollout.insert(
7,
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 1,
})),
);
assert_eq!(fork_turn_positions_in_rollout(&rollout), vec![0, 8]);
}
#[test]
fn truncates_rollout_to_last_n_fork_turns_drops_startup_prefix_even_when_under_limit() {
let rollout = vec![
+22
View File
@@ -412,6 +412,7 @@ mod job {
Some(communication.to_model_input_item())
}
RolloutItem::SessionMeta(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
@@ -790,6 +791,27 @@ mod tests {
assert_eq!(parsed, expected);
}
#[test]
fn serializes_agent_message_response_items_for_memory() {
let communication = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::root().join("worker").expect("agent path"),
Vec::new(),
"delegated task".to_string(),
/*trigger_turn*/ true,
);
let response_item = communication.to_model_input_item();
let serialized = job::serialize_filtered_rollout_response_items(&[
RolloutItem::InterAgentCommunicationMetadata { trigger_turn: true },
RolloutItem::ResponseItem(response_item.clone()),
])
.expect("serialize");
let parsed: Vec<ResponseItem> = serde_json::from_str(&serialized).expect("parse");
assert_eq!(parsed, vec![response_item]);
}
#[test]
fn count_outcomes_sums_token_usage_across_all_jobs() {
let counts = aggregate_stats(vec![
+7 -1
View File
@@ -2599,6 +2599,7 @@ impl InitialHistory {
RolloutItem::SessionMeta(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_)
| RolloutItem::EventMsg(_) => None,
})
@@ -2933,6 +2934,7 @@ fn multi_agent_version_from_items(
RolloutItem::SessionMeta(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_)
| RolloutItem::EventMsg(_) => None,
})
@@ -3080,8 +3082,12 @@ impl<'de> Deserialize<'de> for SessionMetaLine {
pub enum RolloutItem {
SessionMeta(SessionMetaLine),
ResponseItem(ResponseItem),
/// Durable delivery metadata reconstructed as a model-visible `agent_message`.
/// Legacy delivery item reconstructed as a model-visible `agent_message`.
InterAgentCommunication(InterAgentCommunication),
/// Local delivery metadata that is not part of the Responses API item.
InterAgentCommunicationMetadata {
trigger_turn: bool,
},
Compacted(CompactedItem),
TurnContext(TurnContextItem),
EventMsg(EventMsg),
+3 -1
View File
@@ -1151,6 +1151,7 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result<HeadTai
.created_at
.get_or_insert_with(|| rollout_line.timestamp.clone());
}
RolloutItem::InterAgentCommunicationMetadata { .. } => {}
RolloutItem::TurnContext(_) => {
// Not included in `head`; skip.
}
@@ -1213,7 +1214,8 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result<Vec<serde_json::Va
head.push(value);
}
}
RolloutItem::Compacted(_)
RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => {}
}
+2
View File
@@ -69,6 +69,7 @@ pub fn builder_from_items(
RolloutItem::SessionMeta(meta_line) => Some(meta_line),
RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
@@ -123,6 +124,7 @@ pub async fn extract_metadata_from_rollout(
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
@@ -227,6 +227,9 @@ fn rollout_item_type(item: &RolloutItem) -> String {
RolloutItem::SessionMeta(_) => "session_meta".to_string(),
RolloutItem::ResponseItem(item) => response_item_type(item).to_string(),
RolloutItem::InterAgentCommunication(_) => "inter_agent_communication".to_string(),
RolloutItem::InterAgentCommunicationMetadata { .. } => {
"inter_agent_communication_metadata".to_string()
}
RolloutItem::Compacted(_) => "compacted".to_string(),
RolloutItem::TurnContext(_) => "turn_context".to_string(),
RolloutItem::EventMsg(EventMsg::ItemCompleted(event)) => {
+4 -3
View File
@@ -6,7 +6,8 @@ use codex_protocol::models::ResponseItem;
pub fn is_persisted_rollout_item(item: &RolloutItem) -> bool {
match item {
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
RolloutItem::InterAgentCommunication(_) => true,
RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. } => true,
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev),
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
@@ -55,7 +56,8 @@ pub fn should_persist_response_item(item: &ResponseItem) -> bool {
pub fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool {
match item {
ResponseItem::Message { role, .. } => role != "developer",
ResponseItem::LocalShellCall { .. }
ResponseItem::AgentMessage { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::ToolSearchCall { .. }
| ResponseItem::FunctionCallOutput { .. }
@@ -64,7 +66,6 @@ pub fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool {
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::WebSearchCall { .. } => true,
ResponseItem::AdditionalTools { .. }
| ResponseItem::AgentMessage { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::Compaction { .. }
+1
View File
@@ -1856,6 +1856,7 @@ async fn resume_candidate_matches_cwd(
RolloutItem::SessionMeta(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_)
| RolloutItem::EventMsg(_) => None,
})
+1
View File
@@ -284,6 +284,7 @@ fn conversation_text_from_item(item: &RolloutItem) -> Option<String> {
| RolloutItem::EventMsg(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_) => None,
}
}
+3 -1
View File
@@ -22,7 +22,8 @@ pub fn apply_rollout_item(
RolloutItem::TurnContext(turn_ctx) => apply_turn_context(metadata, turn_ctx),
RolloutItem::EventMsg(event) => apply_event_msg(metadata, event),
RolloutItem::ResponseItem(item) => apply_response_item(metadata, item),
RolloutItem::InterAgentCommunication(_) => {}
RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. } => {}
RolloutItem::Compacted(_) => {}
}
if metadata.model_provider.is_empty() {
@@ -40,6 +41,7 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool {
RolloutItem::EventMsg(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_) => false,
}
}
+1
View File
@@ -1163,6 +1163,7 @@ pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
@@ -285,6 +285,7 @@ impl ThreadMetadataSync {
| RolloutItem::EventMsg(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::InterAgentCommunication(_)
| RolloutItem::InterAgentCommunicationMetadata { .. }
| RolloutItem::Compacted(_) => {}
}
}