feat: new op type for sub-agents communication (#15556)

Add `InterAgentCommunication` for v2 agent communication
This commit is contained in:
jif-oai
2026-03-23 21:09:00 +00:00
committed by GitHub
Unverified
parent 7eb9e75b86
commit 18f1a08bc9
11 changed files with 145 additions and 141 deletions
+4 -6
View File
@@ -1,6 +1,4 @@
use crate::agent::AgentStatus;
use crate::agent::inter_agent_instruction::InterAgentDelivery;
use crate::agent::inter_agent_instruction::InterAgentInstruction;
use crate::agent::registry::AgentMetadata;
use crate::agent::registry::AgentRegistry;
use crate::agent::role::DEFAULT_ROLE_NAME;
@@ -23,6 +21,7 @@ use codex_protocol::ThreadId;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
@@ -488,18 +487,17 @@ impl AgentControl {
.await
}
pub(crate) async fn deliver_inter_agent_instruction(
pub(crate) async fn send_inter_agent_communication(
&self,
agent_id: ThreadId,
instruction: InterAgentInstruction,
delivery: InterAgentDelivery,
communication: InterAgentCommunication,
) -> CodexResult<String> {
let state = self.upgrade()?;
self.handle_thread_request_result(
agent_id,
&state,
state
.deliver_inter_agent_instruction(agent_id, instruction, delivery)
.send_op(agent_id, Op::InterAgentCommunication { communication })
.await,
)
.await
@@ -1,74 +0,0 @@
use codex_protocol::AgentPath;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum InterAgentDelivery {
CurrentTurn,
NextTurn,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct InterAgentInstruction {
author: AgentPath,
recipient: AgentPath,
other_recipients: Vec<AgentPath>,
content: String,
}
impl InterAgentInstruction {
pub(crate) fn new(
author: AgentPath,
recipient: AgentPath,
other_recipients: Vec<AgentPath>,
content: String,
) -> Self {
Self {
author,
recipient,
other_recipients,
content,
}
}
pub(crate) fn to_response_item(&self) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: self.as_text(),
}],
end_turn: None,
phase: None,
}
}
pub(crate) fn is_message_content(content: &[ContentItem]) -> bool {
content.iter().any(|content_item| match content_item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
Self::is_instruction_text(text)
}
_ => false,
})
}
fn as_text(&self) -> String {
let other_recipients = self
.other_recipients
.iter()
.map(std::string::ToString::to_string)
.collect::<Vec<_>>()
.join(", ");
format!(
"author: {}\nrecipient: {}\nother_recipients: [{other_recipients}]\nContent: {}",
self.author, self.recipient, self.content
)
}
fn is_instruction_text(text: &str) -> bool {
text.starts_with("author: ")
&& text.contains("\nrecipient: ")
&& text.contains("\nother_recipients: [")
&& text.contains("]\nContent: ")
}
}
-1
View File
@@ -1,6 +1,5 @@
pub(crate) mod agent_resolver;
pub(crate) mod control;
pub(crate) mod inter_agent_instruction;
mod registry;
pub(crate) mod role;
pub(crate) mod status;
+28
View File
@@ -4306,6 +4306,10 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op).await;
false
}
Op::InterAgentCommunication { communication } => {
handlers::inter_agent_communication(&sess, sub.id.clone(), communication).await;
false
}
Op::ExecApproval {
id: approval_id,
turn_id,
@@ -4485,6 +4489,7 @@ mod handlers {
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::ListCustomPromptsResponseEvent;
use codex_protocol::protocol::ListSkillsResponseEvent;
use codex_protocol::protocol::McpServerRefreshConfig;
@@ -4627,6 +4632,29 @@ mod handlers {
}
}
pub async fn inter_agent_communication(
sess: &Arc<Session>,
sub_id: String,
communication: InterAgentCommunication,
) {
let pending_item = communication.to_response_input_item();
if sess
.inject_response_items(vec![pending_item.clone()])
.await
.is_ok()
{
return;
}
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
sess.maybe_emit_unknown_model_warning_for_turn(turn_context.as_ref())
.await;
sess.queue_response_items_for_next_turn(vec![pending_item])
.await;
sess.spawn_task(turn_context, Vec::new(), crate::tasks::RegularTask::new())
.await;
}
pub async fn run_user_shell_command(sess: &Arc<Session>, sub_id: String, command: String) {
if let Some((turn_context, cancellation_token)) =
sess.active_turn_context_and_cancellation_token().await
+1 -32
View File
@@ -1,6 +1,4 @@
use crate::agent::AgentStatus;
use crate::agent::inter_agent_instruction::InterAgentDelivery;
use crate::agent::inter_agent_instruction::InterAgentInstruction;
use crate::codex::Codex;
use crate::codex::SteerInputError;
use crate::config::ConstraintResult;
@@ -158,6 +156,7 @@ impl CodexThread {
/// If the thread already has an active turn, the message is queued as pending input for that
/// turn. Otherwise it is queued at session scope and a regular turn is started so the agent
/// can consume that pending input through the normal turn pipeline.
#[cfg(test)]
pub(crate) async fn append_message(&self, message: ResponseItem) -> CodexResult<String> {
let submission_id = uuid::Uuid::new_v4().to_string();
let pending_item = pending_message_input_item(&message)?;
@@ -180,36 +179,6 @@ impl CodexThread {
Ok(submission_id)
}
pub(crate) async fn deliver_inter_agent_instruction(
&self,
instruction: InterAgentInstruction,
delivery: InterAgentDelivery,
) -> CodexResult<String> {
let message = instruction.to_response_item();
match delivery {
InterAgentDelivery::CurrentTurn => self.append_message(message).await,
InterAgentDelivery::NextTurn => self.queue_message_for_next_turn(message).await,
}
}
/// Queue a prebuilt message so the next turn records it before any submitted user input.
pub(crate) async fn queue_message_for_next_turn(
&self,
message: ResponseItem,
) -> CodexResult<String> {
let submission_id = uuid::Uuid::new_v4().to_string();
let pending_item = pending_message_input_item(&message)?;
self.codex
.session
.queue_response_items_for_next_turn(vec![pending_item])
.await;
self.codex
.session
.ensure_task_for_queued_response_items()
.await;
Ok(submission_id)
}
pub fn rollout_path(&self) -> Option<PathBuf> {
self.rollout_path.clone()
}
+2 -2
View File
@@ -1,4 +1,3 @@
use crate::agent::inter_agent_instruction::InterAgentInstruction;
use crate::codex::TurnContext;
use crate::context_manager::normalize;
use crate::event_mapping::is_contextual_user_message_content;
@@ -18,6 +17,7 @@ use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ImageDetail;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::InputModality;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use codex_protocol::protocol::TurnContextItem;
@@ -638,7 +638,7 @@ pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool {
}
fn is_inter_agent_instruction_content(content: &[ContentItem]) -> bool {
InterAgentInstruction::is_message_content(content)
InterAgentCommunication::is_message_content(content)
}
fn user_message_positions(items: &[ResponseItem]) -> Vec<usize> {
+1
View File
@@ -405,6 +405,7 @@ impl Session {
turn.add_task(task);
*active = Some(turn);
}
async fn take_active_turn(&self) -> Option<ActiveTurn> {
let mut active = self.active_turn.lock().await;
active.take()
-14
View File
@@ -3,8 +3,6 @@ use crate::CodexAuth;
use crate::ModelProviderInfo;
use crate::OPENAI_PROVIDER_ID;
use crate::agent::AgentControl;
use crate::agent::inter_agent_instruction::InterAgentDelivery;
use crate::agent::inter_agent_instruction::InterAgentInstruction;
use crate::codex::Codex;
use crate::codex::CodexSpawnArgs;
use crate::codex::CodexSpawnOk;
@@ -621,18 +619,6 @@ impl ThreadManagerState {
thread.append_message(message).await
}
pub(crate) async fn deliver_inter_agent_instruction(
&self,
thread_id: ThreadId,
instruction: InterAgentInstruction,
delivery: InterAgentDelivery,
) -> CodexResult<String> {
let thread = self.get_thread(thread_id).await?;
thread
.deliver_inter_agent_instruction(instruction, delivery)
.await
}
/// Remove a thread from the manager by ID, returning it when present.
pub(crate) async fn remove_thread(&self, thread_id: &ThreadId) -> Option<Arc<CodexThread>> {
self.threads.write().await.remove(thread_id)
@@ -24,6 +24,7 @@ use crate::tools::handlers::multi_agents_v2::SpawnAgentHandler as SpawnAgentHand
use crate::tools::handlers::multi_agents_v2::WaitAgentHandler as WaitAgentHandlerV2;
use crate::turn_diff_tracker::TurnDiffTracker;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
@@ -373,6 +374,18 @@ async fn multi_agent_v2_spawn_returns_path_and_send_input_accepts_relative_path(
.await
.expect("send_input should accept v2 path");
assert!(manager.captured_ops().iter().any(|(id, op)| {
*id == child_thread_id
&& matches!(
op,
Op::InterAgentCommunication { communication }
if communication.author == AgentPath::root()
&& communication.recipient.as_str() == "/root/test_process"
&& communication.other_recipients.is_empty()
&& communication.content == "continue"
)
}));
let child_thread = manager
.get_thread(child_thread_id)
.await
@@ -618,6 +631,16 @@ async fn multi_agent_v2_send_input_interrupts_busy_child_without_losing_message(
.filter_map(|(id, op)| (*id == agent_id).then_some(op))
.collect();
assert!(ops_for_agent.iter().any(|op| matches!(op, Op::Interrupt)));
assert!(ops_for_agent.iter().any(|op| {
matches!(
op,
Op::InterAgentCommunication { communication }
if communication.author == AgentPath::root()
&& communication.recipient.as_str() == "/root/worker"
&& communication.other_recipients.is_empty()
&& communication.content == "continue"
)
}));
assert!(!ops_for_agent.iter().any(|op| matches!(
op,
Op::UserInput { items, .. }
@@ -1,6 +1,5 @@
use super::*;
use crate::agent::inter_agent_instruction::InterAgentDelivery;
use crate::agent::inter_agent_instruction::InterAgentInstruction;
use codex_protocol::protocol::InterAgentCommunication;
pub(crate) struct Handler;
@@ -66,7 +65,7 @@ impl ToolHandler for Handler {
"target agent is missing an agent_path".to_string(),
)
})?;
let instruction = InterAgentInstruction::new(
let communication = InterAgentCommunication::new(
turn.session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root),
@@ -77,15 +76,7 @@ impl ToolHandler for Handler {
session
.services
.agent_control
.deliver_inter_agent_instruction(
receiver_thread_id,
instruction,
if args.interrupt {
InterAgentDelivery::NextTurn
} else {
InterAgentDelivery::CurrentTurn
},
)
.send_inter_agent_communication(receiver_thread_id, communication)
.await
} else {
session
+83
View File
@@ -38,6 +38,7 @@ use crate::message_history::HistoryEntry;
use crate::models::BaseInstructions;
use crate::models::ContentItem;
use crate::models::MessagePhase;
use crate::models::ResponseInputItem;
use crate::models::ResponseItem;
use crate::models::WebSearchAction;
use crate::num_format::format_with_separators;
@@ -293,6 +294,12 @@ pub enum Op {
personality: Option<Personality>,
},
/// Inter-agent communication that should be recorded as assistant history
/// while still using the normal thread submission lifecycle.
InterAgentCommunication {
communication: InterAgentCommunication,
},
/// Override parts of the persistent turn context for subsequent turns.
///
/// All fields are optional; when omitted, the existing value is preserved.
@@ -499,6 +506,81 @@ pub enum Op {
ListModels,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, JsonSchema, TS)]
pub struct InterAgentCommunication {
pub author: AgentPath,
pub recipient: AgentPath,
#[serde(default)]
pub other_recipients: Vec<AgentPath>,
pub content: String,
}
impl InterAgentCommunication {
pub fn new(
author: AgentPath,
recipient: AgentPath,
other_recipients: Vec<AgentPath>,
content: String,
) -> Self {
Self {
author,
recipient,
other_recipients,
content,
}
}
pub fn to_response_item(&self) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: self.as_text(),
}],
end_turn: None,
phase: None,
}
}
pub fn to_response_input_item(&self) -> ResponseInputItem {
ResponseInputItem::Message {
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: self.as_text(),
}],
}
}
pub fn is_message_content(content: &[ContentItem]) -> bool {
content.iter().any(|content_item| match content_item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
Self::is_instruction_text(text)
}
_ => false,
})
}
fn as_text(&self) -> String {
let other_recipients = self
.other_recipients
.iter()
.map(std::string::ToString::to_string)
.collect::<Vec<_>>()
.join(", ");
format!(
"author: {}\nrecipient: {}\nother_recipients: [{other_recipients}]\nContent: {}",
self.author, self.recipient, self.content
)
}
fn is_instruction_text(text: &str) -> bool {
text.starts_with("author: ")
&& text.contains("\nrecipient: ")
&& text.contains("\nother_recipients: [")
&& text.contains("]\nContent: ")
}
}
impl Op {
pub fn kind(&self) -> &'static str {
match self {
@@ -510,6 +592,7 @@ impl Op {
Self::RealtimeConversationClose => "realtime_conversation_close",
Self::UserInput { .. } => "user_input",
Self::UserTurn { .. } => "user_turn",
Self::InterAgentCommunication { .. } => "inter_agent_communication",
Self::OverrideTurnContext { .. } => "override_turn_context",
Self::ExecApproval { .. } => "exec_approval",
Self::PatchApproval { .. } => "patch_approval",