From 120aa07d81ea9f3838ddec31653d1237db11f09d Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 24 Apr 2026 13:39:26 +0200 Subject: [PATCH] Make MultiAgentV2 interruption markers assistant-authored (#19124) ## Why `MultiAgentV2` follow-up messages are delivered to agents as assistant-authored `InterAgentCommunication` envelopes. When `followup_task` used `interrupt: true`, the interrupted-turn guidance was still persisted as a contextual user message, so model-visible history made a system-generated interruption boundary look user-authored. This keeps interruption guidance consistent with the rest of the v2 inter-agent message stream while preserving the legacy marker shape for non-v2 sessions. ## What changed - Make `interrupted_turn_history_marker` feature-aware. - Record the interrupted-turn marker as an assistant `OutputText` message when `Feature::MultiAgentV2` is enabled. - Keep the existing user contextual fragment for non-v2 sessions. - Apply the same feature-aware marker to interrupted fork snapshots. - Add coverage for the live `followup_task` interrupt path and the helper-level v2 marker shape. ## Testing - `cargo test -p codex-core multi_agent_v2_followup_task_interrupts_busy_child_without_losing_message -- --nocapture` - `cargo test -p codex-core multi_agent_v2_interrupted_marker_uses_assistant_output_message -- --nocapture` - `cargo test -p codex-core interrupted_fork_snapshot -- --nocapture` --- codex-rs/core/src/context/turn_aborted.rs | 1 + codex-rs/core/src/tasks/mod.rs | 27 +++++++-- codex-rs/core/src/thread_manager.rs | 24 ++++++-- codex-rs/core/src/thread_manager_tests.rs | 58 +++++++++++++++---- .../src/tools/handlers/multi_agents_tests.rs | 47 +++++++++++++++ 5 files changed, 136 insertions(+), 21 deletions(-) diff --git a/codex-rs/core/src/context/turn_aborted.rs b/codex-rs/core/src/context/turn_aborted.rs index 3cc5f0c21..34c02b9cf 100644 --- a/codex-rs/core/src/context/turn_aborted.rs +++ b/codex-rs/core/src/context/turn_aborted.rs @@ -7,6 +7,7 @@ pub(crate) struct TurnAborted { impl TurnAborted { pub(crate) const INTERRUPTED_GUIDANCE: &'static str = "The user interrupted the previous turn on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed."; + pub(crate) const INTERRUPTED_DEVELOPER_GUIDANCE: &'static str = "The previous turn was interrupted on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed."; pub(crate) fn new(guidance: impl Into) -> Self { Self { diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index d3142bf77..92da8c07f 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -50,6 +50,7 @@ use codex_protocol::protocol::WarningEvent; use codex_protocol::user_input::UserInput; use codex_features::Feature; +use codex_protocol::models::ContentItem; pub(crate) use compact::CompactTask; pub(crate) use ghost_snapshot::GhostSnapshotTask; pub(crate) use regular::RegularTask; @@ -63,10 +64,26 @@ const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100; /// Shared model-visible marker used by both the real interrupt path and /// interrupted fork snapshots. -pub(crate) fn interrupted_turn_history_marker() -> ResponseItem { - ContextualUserFragment::into(crate::context::TurnAborted::new( - crate::context::TurnAborted::INTERRUPTED_GUIDANCE, - )) +pub(crate) fn interrupted_turn_history_marker(multi_agent_v2_enabled: bool) -> ResponseItem { + let guidance = if multi_agent_v2_enabled { + crate::context::TurnAborted::INTERRUPTED_DEVELOPER_GUIDANCE + } else { + crate::context::TurnAborted::INTERRUPTED_GUIDANCE + }; + let marker = crate::context::TurnAborted::new(guidance); + if multi_agent_v2_enabled { + ResponseItem::Message { + id: None, + role: "developer".to_string(), + content: vec![ContentItem::InputText { + text: marker.render(), + }], + end_turn: None, + phase: None, + } + } else { + ContextualUserFragment::into(marker) + } } fn emit_turn_network_proxy_metric( @@ -675,7 +692,7 @@ impl Session { if reason == TurnAbortReason::Interrupted { self.cleanup_after_interrupt(&task.turn_context).await; - let marker = interrupted_turn_history_marker(); + let marker = interrupted_turn_history_marker(self.enabled(Feature::MultiAgentV2)); self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref()) .await; self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)]) diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 2e6ea5f9e..2509f3b0c 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -22,6 +22,7 @@ use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::TurnStatus; use codex_exec_server::EnvironmentManager; +use codex_features::Feature; use codex_login::AuthManager; use codex_login::CodexAuth; use codex_model_provider::create_model_provider; @@ -745,6 +746,7 @@ impl ThreadManager { let snapshot = snapshot.into(); let history = RolloutRecorder::get_rollout_history(&path).await?; let snapshot_state = snapshot_turn_state(&history); + let multi_agent_v2_enabled = config.features.enabled(Feature::MultiAgentV2); let history = match snapshot { ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => { truncate_before_nth_user_message(history, nth_user_message, &snapshot_state) @@ -757,7 +759,11 @@ impl ThreadManager { InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history), }; if snapshot_state.ends_mid_turn { - append_interrupted_boundary(history, snapshot_state.active_turn_id) + append_interrupted_boundary( + history, + snapshot_state.active_turn_id, + multi_agent_v2_enabled, + ) } else { history } @@ -1225,7 +1231,11 @@ fn snapshot_turn_state(history: &InitialHistory) -> SnapshotTurnState { /// Append the same persisted interrupt boundary used by the live interrupt path /// to an existing fork snapshot after the source thread has been confirmed to /// be mid-turn. -fn append_interrupted_boundary(history: InitialHistory, turn_id: Option) -> InitialHistory { +fn append_interrupted_boundary( + history: InitialHistory, + turn_id: Option, + multi_agent_v2_enabled: bool, +) -> InitialHistory { let aborted_event = RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent { turn_id, reason: TurnAbortReason::Interrupted, @@ -1235,18 +1245,22 @@ fn append_interrupted_boundary(history: InitialHistory, turn_id: Option) match history { InitialHistory::New | InitialHistory::Cleared => InitialHistory::Forked(vec![ - RolloutItem::ResponseItem(interrupted_turn_history_marker()), + RolloutItem::ResponseItem(interrupted_turn_history_marker(multi_agent_v2_enabled)), aborted_event, ]), InitialHistory::Forked(mut history) => { - history.push(RolloutItem::ResponseItem(interrupted_turn_history_marker())); + history.push(RolloutItem::ResponseItem(interrupted_turn_history_marker( + multi_agent_v2_enabled, + ))); history.push(aborted_event); InitialHistory::Forked(history) } InitialHistory::Resumed(mut resumed) => { resumed .history - .push(RolloutItem::ResponseItem(interrupted_turn_history_marker())); + .push(RolloutItem::ResponseItem(interrupted_turn_history_marker( + multi_agent_v2_enabled, + ))); resumed.history.push(aborted_event); InitialHistory::Forked(resumed.history) } diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 0ef7afaff..53a09ce84 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -446,12 +446,19 @@ fn interrupted_fork_snapshot_appends_interrupt_boundary() { assert_eq!( serde_json::to_value( - append_interrupted_boundary(committed_history, /*turn_id*/ None).get_rollout_items() + append_interrupted_boundary( + committed_history, + /*turn_id*/ None, + /*multi_agent_v2_enabled*/ false, + ) + .get_rollout_items() ) .expect("serialize interrupted fork history"), serde_json::to_value(vec![ RolloutItem::ResponseItem(user_msg("hello")), - RolloutItem::ResponseItem(interrupted_turn_history_marker()), + RolloutItem::ResponseItem(interrupted_turn_history_marker( + /*multi_agent_v2_enabled*/ false, + )), RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent { turn_id: None, reason: TurnAbortReason::Interrupted, @@ -463,11 +470,18 @@ fn interrupted_fork_snapshot_appends_interrupt_boundary() { ); assert_eq!( serde_json::to_value( - append_interrupted_boundary(InitialHistory::New, /*turn_id*/ None).get_rollout_items() + append_interrupted_boundary( + InitialHistory::New, + /*turn_id*/ None, + /*multi_agent_v2_enabled*/ false, + ) + .get_rollout_items() ) .expect("serialize interrupted empty fork history"), serde_json::to_value(vec![ - RolloutItem::ResponseItem(interrupted_turn_history_marker()), + RolloutItem::ResponseItem(interrupted_turn_history_marker( + /*multi_agent_v2_enabled*/ false, + )), RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent { turn_id: None, reason: TurnAbortReason::Interrupted, @@ -484,7 +498,9 @@ fn interrupted_snapshot_is_not_mid_turn() { let interrupted_history = InitialHistory::Forked(vec![ RolloutItem::ResponseItem(user_msg("hello")), RolloutItem::ResponseItem(assistant_msg("partial")), - RolloutItem::ResponseItem(interrupted_turn_history_marker()), + RolloutItem::ResponseItem(interrupted_turn_history_marker( + /*multi_agent_v2_enabled*/ false, + )), RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent { turn_id: Some("turn-1".to_string()), reason: TurnAbortReason::Interrupted, @@ -503,6 +519,24 @@ fn interrupted_snapshot_is_not_mid_turn() { ); } +#[test] +fn multi_agent_v2_interrupted_marker_uses_developer_input_message() { + let marker = interrupted_turn_history_marker(/*multi_agent_v2_enabled*/ true); + + let ResponseItem::Message { role, content, .. } = marker else { + panic!("expected interrupted marker to be a message"); + }; + assert_eq!(role, "developer"); + assert!( + matches!( + content.as_slice(), + [ContentItem::InputText { text }] + if text.contains(crate::context::TurnAborted::INTERRUPTED_DEVELOPER_GUIDANCE) + ), + "expected interrupted marker to use developer InputText content" + ); +} + #[test] fn completed_legacy_event_history_is_not_mid_turn() { let completed_history = InitialHistory::Forked(vec![ @@ -618,9 +652,10 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor .into_iter() .filter(|item| !matches!(item, RolloutItem::SessionMeta(_))) .collect(); - let interrupted_marker_json = - serde_json::to_value(RolloutItem::ResponseItem(interrupted_turn_history_marker())) - .expect("serialize interrupted marker"); + let interrupted_marker_json = serde_json::to_value(RolloutItem::ResponseItem( + interrupted_turn_history_marker(/*multi_agent_v2_enabled*/ false), + )) + .expect("serialize interrupted marker"); let interrupted_abort_json = serde_json::to_value(RolloutItem::EventMsg( EventMsg::TurnAborted(TurnAbortedEvent { turn_id: expected_turn_id, @@ -809,9 +844,10 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_ .into_iter() .filter(|item| !matches!(item, RolloutItem::SessionMeta(_))) .collect(); - let interrupted_marker_json = - serde_json::to_value(RolloutItem::ResponseItem(interrupted_turn_history_marker())) - .expect("serialize interrupted marker"); + let interrupted_marker_json = serde_json::to_value(RolloutItem::ResponseItem( + interrupted_turn_history_marker(/*multi_agent_v2_enabled*/ false), + )) + .expect("serialize interrupted marker"); assert_eq!( forked_rollout_items .iter() diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index eefee4678..82db5f13b 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -3,6 +3,7 @@ use crate::CodexThread; use crate::ThreadManager; use crate::config::AgentRoleConfig; use crate::config::DEFAULT_AGENT_MAX_DEPTH; +use crate::context::TurnAborted; use crate::function_tool::FunctionCallError; use crate::session::tests::make_session_and_context; use crate::session_prefix::format_subagent_notification_message; @@ -1566,6 +1567,52 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa })); wait_for_turn_aborted(&thread, &interrupted_turn_id, TurnAbortReason::Interrupted).await; + let history_items = thread + .codex + .session + .clone_history() + .await + .raw_items() + .to_vec(); + assert!( + history_items.iter().any(|item| matches!( + item, + ResponseItem::Message { role, content, .. } + if role == "developer" + && content.iter().any(|content_item| matches!( + content_item, + ContentItem::InputText { text } + if text.contains(TurnAborted::INTERRUPTED_DEVELOPER_GUIDANCE) + )) + )), + "v2 interrupted-turn marker should be recorded as a developer input message" + ); + assert!( + !history_items.iter().any(|item| matches!( + item, + ResponseItem::Message { role, content, .. } + if role == "user" + && content.iter().any(|content_item| matches!( + content_item, + ContentItem::InputText { text } | ContentItem::OutputText { text } + if text.contains(TurnAborted::INTERRUPTED_GUIDANCE) + )) + )), + "v2 interrupted-turn marker should not be recorded as a user message" + ); + assert!( + !history_items.iter().any(|item| matches!( + item, + ResponseItem::Message { role, content, .. } + if role == "assistant" + && content.iter().any(|content_item| matches!( + content_item, + ContentItem::InputText { text } | ContentItem::OutputText { text } + if text.contains(TurnAborted::INTERRUPTED_DEVELOPER_GUIDANCE) + )) + )), + "v2 interrupted-turn marker should not be recorded as an assistant message" + ); wait_for_redirected_envelope_in_history( &thread, &InterAgentCommunication::new(