mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
Make MultiAgentV2 interruption markers assistant-authored (#19124)
## Why `MultiAgentV2` follow-up messages are delivered to agents as assistant-authored `InterAgentCommunication` envelopes. When `followup_task` used `interrupt: true`, the interrupted-turn guidance was still persisted as a contextual user message, so model-visible history made a system-generated interruption boundary look user-authored. This keeps interruption guidance consistent with the rest of the v2 inter-agent message stream while preserving the legacy marker shape for non-v2 sessions. ## What changed - Make `interrupted_turn_history_marker` feature-aware. - Record the interrupted-turn marker as an assistant `OutputText` message when `Feature::MultiAgentV2` is enabled. - Keep the existing user contextual fragment for non-v2 sessions. - Apply the same feature-aware marker to interrupted fork snapshots. - Add coverage for the live `followup_task` interrupt path and the helper-level v2 marker shape. ## Testing - `cargo test -p codex-core multi_agent_v2_followup_task_interrupts_busy_child_without_losing_message -- --nocapture` - `cargo test -p codex-core multi_agent_v2_interrupted_marker_uses_assistant_output_message -- --nocapture` - `cargo test -p codex-core interrupted_fork_snapshot -- --nocapture`
This commit is contained in:
committed by
GitHub
Unverified
parent
21463a5074
commit
120aa07d81
@@ -7,6 +7,7 @@ pub(crate) struct TurnAborted {
|
||||
|
||||
impl TurnAborted {
|
||||
pub(crate) const INTERRUPTED_GUIDANCE: &'static str = "The user interrupted the previous turn on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed.";
|
||||
pub(crate) const INTERRUPTED_DEVELOPER_GUIDANCE: &'static str = "The previous turn was interrupted on purpose. Any running unified exec processes may still be running in the background. If any tools/commands were aborted, they may have partially executed.";
|
||||
|
||||
pub(crate) fn new(guidance: impl Into<String>) -> Self {
|
||||
Self {
|
||||
|
||||
@@ -50,6 +50,7 @@ use codex_protocol::protocol::WarningEvent;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::models::ContentItem;
|
||||
pub(crate) use compact::CompactTask;
|
||||
pub(crate) use ghost_snapshot::GhostSnapshotTask;
|
||||
pub(crate) use regular::RegularTask;
|
||||
@@ -63,10 +64,26 @@ const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
|
||||
|
||||
/// Shared model-visible marker used by both the real interrupt path and
|
||||
/// interrupted fork snapshots.
|
||||
pub(crate) fn interrupted_turn_history_marker() -> ResponseItem {
|
||||
ContextualUserFragment::into(crate::context::TurnAborted::new(
|
||||
crate::context::TurnAborted::INTERRUPTED_GUIDANCE,
|
||||
))
|
||||
pub(crate) fn interrupted_turn_history_marker(multi_agent_v2_enabled: bool) -> ResponseItem {
|
||||
let guidance = if multi_agent_v2_enabled {
|
||||
crate::context::TurnAborted::INTERRUPTED_DEVELOPER_GUIDANCE
|
||||
} else {
|
||||
crate::context::TurnAborted::INTERRUPTED_GUIDANCE
|
||||
};
|
||||
let marker = crate::context::TurnAborted::new(guidance);
|
||||
if multi_agent_v2_enabled {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "developer".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: marker.render(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
} else {
|
||||
ContextualUserFragment::into(marker)
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_turn_network_proxy_metric(
|
||||
@@ -675,7 +692,7 @@ impl Session {
|
||||
if reason == TurnAbortReason::Interrupted {
|
||||
self.cleanup_after_interrupt(&task.turn_context).await;
|
||||
|
||||
let marker = interrupted_turn_history_marker();
|
||||
let marker = interrupted_turn_history_marker(self.enabled(Feature::MultiAgentV2));
|
||||
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
|
||||
.await;
|
||||
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::ThreadHistoryBuilder;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_model_provider::create_model_provider;
|
||||
@@ -745,6 +746,7 @@ impl ThreadManager {
|
||||
let snapshot = snapshot.into();
|
||||
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
let snapshot_state = snapshot_turn_state(&history);
|
||||
let multi_agent_v2_enabled = config.features.enabled(Feature::MultiAgentV2);
|
||||
let history = match snapshot {
|
||||
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => {
|
||||
truncate_before_nth_user_message(history, nth_user_message, &snapshot_state)
|
||||
@@ -757,7 +759,11 @@ impl ThreadManager {
|
||||
InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history),
|
||||
};
|
||||
if snapshot_state.ends_mid_turn {
|
||||
append_interrupted_boundary(history, snapshot_state.active_turn_id)
|
||||
append_interrupted_boundary(
|
||||
history,
|
||||
snapshot_state.active_turn_id,
|
||||
multi_agent_v2_enabled,
|
||||
)
|
||||
} else {
|
||||
history
|
||||
}
|
||||
@@ -1225,7 +1231,11 @@ fn snapshot_turn_state(history: &InitialHistory) -> SnapshotTurnState {
|
||||
/// Append the same persisted interrupt boundary used by the live interrupt path
|
||||
/// to an existing fork snapshot after the source thread has been confirmed to
|
||||
/// be mid-turn.
|
||||
fn append_interrupted_boundary(history: InitialHistory, turn_id: Option<String>) -> InitialHistory {
|
||||
fn append_interrupted_boundary(
|
||||
history: InitialHistory,
|
||||
turn_id: Option<String>,
|
||||
multi_agent_v2_enabled: bool,
|
||||
) -> InitialHistory {
|
||||
let aborted_event = RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id,
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
@@ -1235,18 +1245,22 @@ fn append_interrupted_boundary(history: InitialHistory, turn_id: Option<String>)
|
||||
|
||||
match history {
|
||||
InitialHistory::New | InitialHistory::Cleared => InitialHistory::Forked(vec![
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker(multi_agent_v2_enabled)),
|
||||
aborted_event,
|
||||
]),
|
||||
InitialHistory::Forked(mut history) => {
|
||||
history.push(RolloutItem::ResponseItem(interrupted_turn_history_marker()));
|
||||
history.push(RolloutItem::ResponseItem(interrupted_turn_history_marker(
|
||||
multi_agent_v2_enabled,
|
||||
)));
|
||||
history.push(aborted_event);
|
||||
InitialHistory::Forked(history)
|
||||
}
|
||||
InitialHistory::Resumed(mut resumed) => {
|
||||
resumed
|
||||
.history
|
||||
.push(RolloutItem::ResponseItem(interrupted_turn_history_marker()));
|
||||
.push(RolloutItem::ResponseItem(interrupted_turn_history_marker(
|
||||
multi_agent_v2_enabled,
|
||||
)));
|
||||
resumed.history.push(aborted_event);
|
||||
InitialHistory::Forked(resumed.history)
|
||||
}
|
||||
|
||||
@@ -446,12 +446,19 @@ fn interrupted_fork_snapshot_appends_interrupt_boundary() {
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(
|
||||
append_interrupted_boundary(committed_history, /*turn_id*/ None).get_rollout_items()
|
||||
append_interrupted_boundary(
|
||||
committed_history,
|
||||
/*turn_id*/ None,
|
||||
/*multi_agent_v2_enabled*/ false,
|
||||
)
|
||||
.get_rollout_items()
|
||||
)
|
||||
.expect("serialize interrupted fork history"),
|
||||
serde_json::to_value(vec![
|
||||
RolloutItem::ResponseItem(user_msg("hello")),
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker(
|
||||
/*multi_agent_v2_enabled*/ false,
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: None,
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
@@ -463,11 +470,18 @@ fn interrupted_fork_snapshot_appends_interrupt_boundary() {
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(
|
||||
append_interrupted_boundary(InitialHistory::New, /*turn_id*/ None).get_rollout_items()
|
||||
append_interrupted_boundary(
|
||||
InitialHistory::New,
|
||||
/*turn_id*/ None,
|
||||
/*multi_agent_v2_enabled*/ false,
|
||||
)
|
||||
.get_rollout_items()
|
||||
)
|
||||
.expect("serialize interrupted empty fork history"),
|
||||
serde_json::to_value(vec![
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker(
|
||||
/*multi_agent_v2_enabled*/ false,
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: None,
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
@@ -484,7 +498,9 @@ fn interrupted_snapshot_is_not_mid_turn() {
|
||||
let interrupted_history = InitialHistory::Forked(vec![
|
||||
RolloutItem::ResponseItem(user_msg("hello")),
|
||||
RolloutItem::ResponseItem(assistant_msg("partial")),
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker()),
|
||||
RolloutItem::ResponseItem(interrupted_turn_history_marker(
|
||||
/*multi_agent_v2_enabled*/ false,
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: Some("turn-1".to_string()),
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
@@ -503,6 +519,24 @@ fn interrupted_snapshot_is_not_mid_turn() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multi_agent_v2_interrupted_marker_uses_developer_input_message() {
|
||||
let marker = interrupted_turn_history_marker(/*multi_agent_v2_enabled*/ true);
|
||||
|
||||
let ResponseItem::Message { role, content, .. } = marker else {
|
||||
panic!("expected interrupted marker to be a message");
|
||||
};
|
||||
assert_eq!(role, "developer");
|
||||
assert!(
|
||||
matches!(
|
||||
content.as_slice(),
|
||||
[ContentItem::InputText { text }]
|
||||
if text.contains(crate::context::TurnAborted::INTERRUPTED_DEVELOPER_GUIDANCE)
|
||||
),
|
||||
"expected interrupted marker to use developer InputText content"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn completed_legacy_event_history_is_not_mid_turn() {
|
||||
let completed_history = InitialHistory::Forked(vec![
|
||||
@@ -618,9 +652,10 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
|
||||
.into_iter()
|
||||
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
|
||||
.collect();
|
||||
let interrupted_marker_json =
|
||||
serde_json::to_value(RolloutItem::ResponseItem(interrupted_turn_history_marker()))
|
||||
.expect("serialize interrupted marker");
|
||||
let interrupted_marker_json = serde_json::to_value(RolloutItem::ResponseItem(
|
||||
interrupted_turn_history_marker(/*multi_agent_v2_enabled*/ false),
|
||||
))
|
||||
.expect("serialize interrupted marker");
|
||||
let interrupted_abort_json = serde_json::to_value(RolloutItem::EventMsg(
|
||||
EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: expected_turn_id,
|
||||
@@ -809,9 +844,10 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
|
||||
.into_iter()
|
||||
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
|
||||
.collect();
|
||||
let interrupted_marker_json =
|
||||
serde_json::to_value(RolloutItem::ResponseItem(interrupted_turn_history_marker()))
|
||||
.expect("serialize interrupted marker");
|
||||
let interrupted_marker_json = serde_json::to_value(RolloutItem::ResponseItem(
|
||||
interrupted_turn_history_marker(/*multi_agent_v2_enabled*/ false),
|
||||
))
|
||||
.expect("serialize interrupted marker");
|
||||
assert_eq!(
|
||||
forked_rollout_items
|
||||
.iter()
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::CodexThread;
|
||||
use crate::ThreadManager;
|
||||
use crate::config::AgentRoleConfig;
|
||||
use crate::config::DEFAULT_AGENT_MAX_DEPTH;
|
||||
use crate::context::TurnAborted;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::session::tests::make_session_and_context;
|
||||
use crate::session_prefix::format_subagent_notification_message;
|
||||
@@ -1566,6 +1567,52 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa
|
||||
}));
|
||||
|
||||
wait_for_turn_aborted(&thread, &interrupted_turn_id, TurnAbortReason::Interrupted).await;
|
||||
let history_items = thread
|
||||
.codex
|
||||
.session
|
||||
.clone_history()
|
||||
.await
|
||||
.raw_items()
|
||||
.to_vec();
|
||||
assert!(
|
||||
history_items.iter().any(|item| matches!(
|
||||
item,
|
||||
ResponseItem::Message { role, content, .. }
|
||||
if role == "developer"
|
||||
&& content.iter().any(|content_item| matches!(
|
||||
content_item,
|
||||
ContentItem::InputText { text }
|
||||
if text.contains(TurnAborted::INTERRUPTED_DEVELOPER_GUIDANCE)
|
||||
))
|
||||
)),
|
||||
"v2 interrupted-turn marker should be recorded as a developer input message"
|
||||
);
|
||||
assert!(
|
||||
!history_items.iter().any(|item| matches!(
|
||||
item,
|
||||
ResponseItem::Message { role, content, .. }
|
||||
if role == "user"
|
||||
&& content.iter().any(|content_item| matches!(
|
||||
content_item,
|
||||
ContentItem::InputText { text } | ContentItem::OutputText { text }
|
||||
if text.contains(TurnAborted::INTERRUPTED_GUIDANCE)
|
||||
))
|
||||
)),
|
||||
"v2 interrupted-turn marker should not be recorded as a user message"
|
||||
);
|
||||
assert!(
|
||||
!history_items.iter().any(|item| matches!(
|
||||
item,
|
||||
ResponseItem::Message { role, content, .. }
|
||||
if role == "assistant"
|
||||
&& content.iter().any(|content_item| matches!(
|
||||
content_item,
|
||||
ContentItem::InputText { text } | ContentItem::OutputText { text }
|
||||
if text.contains(TurnAborted::INTERRUPTED_DEVELOPER_GUIDANCE)
|
||||
))
|
||||
)),
|
||||
"v2 interrupted-turn marker should not be recorded as an assistant message"
|
||||
);
|
||||
wait_for_redirected_envelope_in_history(
|
||||
&thread,
|
||||
&InterAgentCommunication::new(
|
||||
|
||||
Reference in New Issue
Block a user