From 18f1a08bc9c6e39331d9cf34ee240ea0124173cb Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 23 Mar 2026 21:09:00 +0000 Subject: [PATCH] feat: new op type for sub-agents communication (#15556) Add `InterAgentCommunication` for v2 agent communication --- codex-rs/core/src/agent/control.rs | 10 +-- .../core/src/agent/inter_agent_instruction.rs | 74 ----------------- codex-rs/core/src/agent/mod.rs | 1 - codex-rs/core/src/codex.rs | 28 +++++++ codex-rs/core/src/codex_thread.rs | 33 +------- codex-rs/core/src/context_manager/history.rs | 4 +- codex-rs/core/src/tasks/mod.rs | 1 + codex-rs/core/src/thread_manager.rs | 14 ---- .../src/tools/handlers/multi_agents_tests.rs | 23 +++++ .../handlers/multi_agents_v2/send_input.rs | 15 +--- codex-rs/protocol/src/protocol.rs | 83 +++++++++++++++++++ 11 files changed, 145 insertions(+), 141 deletions(-) delete mode 100644 codex-rs/core/src/agent/inter_agent_instruction.rs diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 5bc5a0711..3bdcc2efd 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -1,6 +1,4 @@ use crate::agent::AgentStatus; -use crate::agent::inter_agent_instruction::InterAgentDelivery; -use crate::agent::inter_agent_instruction::InterAgentInstruction; use crate::agent::registry::AgentMetadata; use crate::agent::registry::AgentRegistry; use crate::agent::role::DEFAULT_ROLE_NAME; @@ -23,6 +21,7 @@ use codex_protocol::ThreadId; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::InitialHistory; +use codex_protocol::protocol::InterAgentCommunication; use codex_protocol::protocol::Op; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; @@ -488,18 +487,17 @@ impl AgentControl { .await } - pub(crate) async fn deliver_inter_agent_instruction( + pub(crate) async fn send_inter_agent_communication( &self, agent_id: ThreadId, - instruction: InterAgentInstruction, - delivery: InterAgentDelivery, + communication: InterAgentCommunication, ) -> CodexResult { let state = self.upgrade()?; self.handle_thread_request_result( agent_id, &state, state - .deliver_inter_agent_instruction(agent_id, instruction, delivery) + .send_op(agent_id, Op::InterAgentCommunication { communication }) .await, ) .await diff --git a/codex-rs/core/src/agent/inter_agent_instruction.rs b/codex-rs/core/src/agent/inter_agent_instruction.rs deleted file mode 100644 index 0eff40beb..000000000 --- a/codex-rs/core/src/agent/inter_agent_instruction.rs +++ /dev/null @@ -1,74 +0,0 @@ -use codex_protocol::AgentPath; -use codex_protocol::models::ContentItem; -use codex_protocol::models::ResponseItem; - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub(crate) enum InterAgentDelivery { - CurrentTurn, - NextTurn, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub(crate) struct InterAgentInstruction { - author: AgentPath, - recipient: AgentPath, - other_recipients: Vec, - content: String, -} - -impl InterAgentInstruction { - pub(crate) fn new( - author: AgentPath, - recipient: AgentPath, - other_recipients: Vec, - content: String, - ) -> Self { - Self { - author, - recipient, - other_recipients, - content, - } - } - - pub(crate) fn to_response_item(&self) -> ResponseItem { - ResponseItem::Message { - id: None, - role: "assistant".to_string(), - content: vec![ContentItem::OutputText { - text: self.as_text(), - }], - end_turn: None, - phase: None, - } - } - - pub(crate) fn is_message_content(content: &[ContentItem]) -> bool { - content.iter().any(|content_item| match content_item { - ContentItem::InputText { text } | ContentItem::OutputText { text } => { - Self::is_instruction_text(text) - } - _ => false, - }) - } - - fn as_text(&self) -> String { - let other_recipients = self - .other_recipients - .iter() - .map(std::string::ToString::to_string) - .collect::>() - .join(", "); - format!( - "author: {}\nrecipient: {}\nother_recipients: [{other_recipients}]\nContent: {}", - self.author, self.recipient, self.content - ) - } - - fn is_instruction_text(text: &str) -> bool { - text.starts_with("author: ") - && text.contains("\nrecipient: ") - && text.contains("\nother_recipients: [") - && text.contains("]\nContent: ") - } -} diff --git a/codex-rs/core/src/agent/mod.rs b/codex-rs/core/src/agent/mod.rs index 3f14e5c59..350962dc0 100644 --- a/codex-rs/core/src/agent/mod.rs +++ b/codex-rs/core/src/agent/mod.rs @@ -1,6 +1,5 @@ pub(crate) mod agent_resolver; pub(crate) mod control; -pub(crate) mod inter_agent_instruction; mod registry; pub(crate) mod role; pub(crate) mod status; diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index b040883c2..9b8102552 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -4306,6 +4306,10 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op).await; false } + Op::InterAgentCommunication { communication } => { + handlers::inter_agent_communication(&sess, sub.id.clone(), communication).await; + false + } Op::ExecApproval { id: approval_id, turn_id, @@ -4485,6 +4489,7 @@ mod handlers { use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; + use codex_protocol::protocol::InterAgentCommunication; use codex_protocol::protocol::ListCustomPromptsResponseEvent; use codex_protocol::protocol::ListSkillsResponseEvent; use codex_protocol::protocol::McpServerRefreshConfig; @@ -4627,6 +4632,29 @@ mod handlers { } } + pub async fn inter_agent_communication( + sess: &Arc, + sub_id: String, + communication: InterAgentCommunication, + ) { + let pending_item = communication.to_response_input_item(); + if sess + .inject_response_items(vec![pending_item.clone()]) + .await + .is_ok() + { + return; + } + + let turn_context = sess.new_default_turn_with_sub_id(sub_id).await; + sess.maybe_emit_unknown_model_warning_for_turn(turn_context.as_ref()) + .await; + sess.queue_response_items_for_next_turn(vec![pending_item]) + .await; + sess.spawn_task(turn_context, Vec::new(), crate::tasks::RegularTask::new()) + .await; + } + pub async fn run_user_shell_command(sess: &Arc, sub_id: String, command: String) { if let Some((turn_context, cancellation_token)) = sess.active_turn_context_and_cancellation_token().await diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index ff36de6c1..9fa706911 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -1,6 +1,4 @@ use crate::agent::AgentStatus; -use crate::agent::inter_agent_instruction::InterAgentDelivery; -use crate::agent::inter_agent_instruction::InterAgentInstruction; use crate::codex::Codex; use crate::codex::SteerInputError; use crate::config::ConstraintResult; @@ -158,6 +156,7 @@ impl CodexThread { /// If the thread already has an active turn, the message is queued as pending input for that /// turn. Otherwise it is queued at session scope and a regular turn is started so the agent /// can consume that pending input through the normal turn pipeline. + #[cfg(test)] pub(crate) async fn append_message(&self, message: ResponseItem) -> CodexResult { let submission_id = uuid::Uuid::new_v4().to_string(); let pending_item = pending_message_input_item(&message)?; @@ -180,36 +179,6 @@ impl CodexThread { Ok(submission_id) } - pub(crate) async fn deliver_inter_agent_instruction( - &self, - instruction: InterAgentInstruction, - delivery: InterAgentDelivery, - ) -> CodexResult { - let message = instruction.to_response_item(); - match delivery { - InterAgentDelivery::CurrentTurn => self.append_message(message).await, - InterAgentDelivery::NextTurn => self.queue_message_for_next_turn(message).await, - } - } - - /// Queue a prebuilt message so the next turn records it before any submitted user input. - pub(crate) async fn queue_message_for_next_turn( - &self, - message: ResponseItem, - ) -> CodexResult { - let submission_id = uuid::Uuid::new_v4().to_string(); - let pending_item = pending_message_input_item(&message)?; - self.codex - .session - .queue_response_items_for_next_turn(vec![pending_item]) - .await; - self.codex - .session - .ensure_task_for_queued_response_items() - .await; - Ok(submission_id) - } - pub fn rollout_path(&self) -> Option { self.rollout_path.clone() } diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 6ed037048..57807e5f8 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -1,4 +1,3 @@ -use crate::agent::inter_agent_instruction::InterAgentInstruction; use crate::codex::TurnContext; use crate::context_manager::normalize; use crate::event_mapping::is_contextual_user_message_content; @@ -18,6 +17,7 @@ use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ImageDetail; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::InputModality; +use codex_protocol::protocol::InterAgentCommunication; use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TokenUsageInfo; use codex_protocol::protocol::TurnContextItem; @@ -638,7 +638,7 @@ pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool { } fn is_inter_agent_instruction_content(content: &[ContentItem]) -> bool { - InterAgentInstruction::is_message_content(content) + InterAgentCommunication::is_message_content(content) } fn user_message_positions(items: &[ResponseItem]) -> Vec { diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index b2f110486..d9c4954cc 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -405,6 +405,7 @@ impl Session { turn.add_task(task); *active = Some(turn); } + async fn take_active_turn(&self) -> Option { let mut active = self.active_turn.lock().await; active.take() diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index d93bf3718..185a32a70 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -3,8 +3,6 @@ use crate::CodexAuth; use crate::ModelProviderInfo; use crate::OPENAI_PROVIDER_ID; use crate::agent::AgentControl; -use crate::agent::inter_agent_instruction::InterAgentDelivery; -use crate::agent::inter_agent_instruction::InterAgentInstruction; use crate::codex::Codex; use crate::codex::CodexSpawnArgs; use crate::codex::CodexSpawnOk; @@ -621,18 +619,6 @@ impl ThreadManagerState { thread.append_message(message).await } - pub(crate) async fn deliver_inter_agent_instruction( - &self, - thread_id: ThreadId, - instruction: InterAgentInstruction, - delivery: InterAgentDelivery, - ) -> CodexResult { - let thread = self.get_thread(thread_id).await?; - thread - .deliver_inter_agent_instruction(instruction, delivery) - .await - } - /// Remove a thread from the manager by ID, returning it when present. pub(crate) async fn remove_thread(&self, thread_id: &ThreadId) -> Option> { self.threads.write().await.remove(thread_id) diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index f60137f7f..7b8471f03 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -24,6 +24,7 @@ use crate::tools::handlers::multi_agents_v2::SpawnAgentHandler as SpawnAgentHand use crate::tools::handlers::multi_agents_v2::WaitAgentHandler as WaitAgentHandlerV2; use crate::turn_diff_tracker::TurnDiffTracker; use codex_features::Feature; +use codex_protocol::AgentPath; use codex_protocol::ThreadId; use codex_protocol::models::BaseInstructions; use codex_protocol::models::ContentItem; @@ -373,6 +374,18 @@ async fn multi_agent_v2_spawn_returns_path_and_send_input_accepts_relative_path( .await .expect("send_input should accept v2 path"); + assert!(manager.captured_ops().iter().any(|(id, op)| { + *id == child_thread_id + && matches!( + op, + Op::InterAgentCommunication { communication } + if communication.author == AgentPath::root() + && communication.recipient.as_str() == "/root/test_process" + && communication.other_recipients.is_empty() + && communication.content == "continue" + ) + })); + let child_thread = manager .get_thread(child_thread_id) .await @@ -618,6 +631,16 @@ async fn multi_agent_v2_send_input_interrupts_busy_child_without_losing_message( .filter_map(|(id, op)| (*id == agent_id).then_some(op)) .collect(); assert!(ops_for_agent.iter().any(|op| matches!(op, Op::Interrupt))); + assert!(ops_for_agent.iter().any(|op| { + matches!( + op, + Op::InterAgentCommunication { communication } + if communication.author == AgentPath::root() + && communication.recipient.as_str() == "/root/worker" + && communication.other_recipients.is_empty() + && communication.content == "continue" + ) + })); assert!(!ops_for_agent.iter().any(|op| matches!( op, Op::UserInput { items, .. } diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/send_input.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/send_input.rs index 8c29a7aec..c17e12cca 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/send_input.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/send_input.rs @@ -1,6 +1,5 @@ use super::*; -use crate::agent::inter_agent_instruction::InterAgentDelivery; -use crate::agent::inter_agent_instruction::InterAgentInstruction; +use codex_protocol::protocol::InterAgentCommunication; pub(crate) struct Handler; @@ -66,7 +65,7 @@ impl ToolHandler for Handler { "target agent is missing an agent_path".to_string(), ) })?; - let instruction = InterAgentInstruction::new( + let communication = InterAgentCommunication::new( turn.session_source .get_agent_path() .unwrap_or_else(AgentPath::root), @@ -77,15 +76,7 @@ impl ToolHandler for Handler { session .services .agent_control - .deliver_inter_agent_instruction( - receiver_thread_id, - instruction, - if args.interrupt { - InterAgentDelivery::NextTurn - } else { - InterAgentDelivery::CurrentTurn - }, - ) + .send_inter_agent_communication(receiver_thread_id, communication) .await } else { session diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index c88c4ecee..c8af844f7 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -38,6 +38,7 @@ use crate::message_history::HistoryEntry; use crate::models::BaseInstructions; use crate::models::ContentItem; use crate::models::MessagePhase; +use crate::models::ResponseInputItem; use crate::models::ResponseItem; use crate::models::WebSearchAction; use crate::num_format::format_with_separators; @@ -293,6 +294,12 @@ pub enum Op { personality: Option, }, + /// Inter-agent communication that should be recorded as assistant history + /// while still using the normal thread submission lifecycle. + InterAgentCommunication { + communication: InterAgentCommunication, + }, + /// Override parts of the persistent turn context for subsequent turns. /// /// All fields are optional; when omitted, the existing value is preserved. @@ -499,6 +506,81 @@ pub enum Op { ListModels, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, JsonSchema, TS)] +pub struct InterAgentCommunication { + pub author: AgentPath, + pub recipient: AgentPath, + #[serde(default)] + pub other_recipients: Vec, + pub content: String, +} + +impl InterAgentCommunication { + pub fn new( + author: AgentPath, + recipient: AgentPath, + other_recipients: Vec, + content: String, + ) -> Self { + Self { + author, + recipient, + other_recipients, + content, + } + } + + pub fn to_response_item(&self) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: self.as_text(), + }], + end_turn: None, + phase: None, + } + } + + pub fn to_response_input_item(&self) -> ResponseInputItem { + ResponseInputItem::Message { + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: self.as_text(), + }], + } + } + + pub fn is_message_content(content: &[ContentItem]) -> bool { + content.iter().any(|content_item| match content_item { + ContentItem::InputText { text } | ContentItem::OutputText { text } => { + Self::is_instruction_text(text) + } + _ => false, + }) + } + + fn as_text(&self) -> String { + let other_recipients = self + .other_recipients + .iter() + .map(std::string::ToString::to_string) + .collect::>() + .join(", "); + format!( + "author: {}\nrecipient: {}\nother_recipients: [{other_recipients}]\nContent: {}", + self.author, self.recipient, self.content + ) + } + + fn is_instruction_text(text: &str) -> bool { + text.starts_with("author: ") + && text.contains("\nrecipient: ") + && text.contains("\nother_recipients: [") + && text.contains("]\nContent: ") + } +} + impl Op { pub fn kind(&self) -> &'static str { match self { @@ -510,6 +592,7 @@ impl Op { Self::RealtimeConversationClose => "realtime_conversation_close", Self::UserInput { .. } => "user_input", Self::UserTurn { .. } => "user_turn", + Self::InterAgentCommunication { .. } => "inter_agent_communication", Self::OverrideTurnContext { .. } => "override_turn_context", Self::ExecApproval { .. } => "exec_approval", Self::PatchApproval { .. } => "patch_approval",