diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 86105c67f..17f670106 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -293,6 +293,7 @@ use crate::shell_snapshot::ShellSnapshot; use crate::skills_watcher::SkillsWatcher; use crate::skills_watcher::SkillsWatcherEvent; use crate::state::ActiveTurn; +use crate::state::MailboxDeliveryPhase; use crate::state::SessionServices; use crate::state::SessionState; use crate::tasks::GhostSnapshotTask; @@ -4046,6 +4047,16 @@ impl Session { } pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) { + self.set_mailbox_delivery_phase(sub_id, MailboxDeliveryPhase::NextTurn) + .await; + } + + pub(crate) async fn accept_mailbox_delivery_for_current_turn(&self, sub_id: &str) { + self.set_mailbox_delivery_phase(sub_id, MailboxDeliveryPhase::CurrentTurn) + .await; + } + + async fn set_mailbox_delivery_phase(&self, sub_id: &str, phase: MailboxDeliveryPhase) { let turn_state = { let active = self.active_turn.lock().await; active.as_ref().and_then(|active_turn| { @@ -4058,10 +4069,7 @@ impl Session { let Some(turn_state) = turn_state else { return; }; - turn_state - .lock() - .await - .defer_mailbox_delivery_to_next_turn(); + turn_state.lock().await.set_mailbox_delivery_phase(phase); } pub(crate) fn subscribe_mailbox_seq(&self) -> watch::Receiver { diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 441095a5a..f7d343a81 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -4896,6 +4896,55 @@ async fn steered_input_reopens_mailbox_delivery_for_current_turn() { ); } +#[tokio::test] +async fn tool_calls_reopen_mailbox_delivery_for_current_turn() { + let (sess, tc, _rx) = make_session_and_context_with_rx().await; + let communication = InterAgentCommunication::new( + AgentPath::try_from("/root/worker").expect("worker path should parse"), + AgentPath::root(), + Vec::new(), + "queued child update".to_string(), + /*trigger_turn*/ false, + ); + sess.spawn_task( + Arc::clone(&tc), + Vec::new(), + NeverEndingTask { + kind: TaskKind::Regular, + listen_to_cancellation_token: true, + }, + ) + .await; + + sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; + sess.enqueue_mailbox_communication(communication.clone()); + + let item = ResponseItem::FunctionCall { + id: None, + name: "test_tool".to_string(), + namespace: None, + arguments: "{}".to_string(), + call_id: "call-1".to_string(), + }; + let mut ctx = HandleOutputCtx { + sess: Arc::clone(&sess), + turn_context: Arc::clone(&tc), + tool_runtime: test_tool_runtime(Arc::clone(&sess), Arc::clone(&tc)), + cancellation_token: CancellationToken::new(), + }; + + let output = handle_output_item_done(&mut ctx, item, /*previously_active_item*/ None) + .await + .expect("tool call should be handled"); + + assert!(output.needs_follow_up); + assert!(output.tool_future.is_some()); + assert_eq!( + sess.get_pending_input().await, + vec![communication.to_response_input_item()], + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn abort_review_task_emits_exited_then_aborted_and_records_history() { let (sess, tc, rx) = make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/state/mod.rs b/codex-rs/core/src/state/mod.rs index 642433a78..af318d801 100644 --- a/codex-rs/core/src/state/mod.rs +++ b/codex-rs/core/src/state/mod.rs @@ -5,5 +5,6 @@ mod turn; pub(crate) use service::SessionServices; pub(crate) use session::SessionState; pub(crate) use turn::ActiveTurn; +pub(crate) use turn::MailboxDeliveryPhase; pub(crate) use turn::RunningTask; pub(crate) use turn::TaskKind; diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index 3cc6200a3..223edbe7a 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -215,17 +215,21 @@ impl TurnState { } pub(crate) fn defer_mailbox_delivery_to_next_turn(&mut self) { - self.mailbox_delivery_phase = MailboxDeliveryPhase::NextTurn; + self.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn); } pub(crate) fn accept_mailbox_delivery_for_current_turn(&mut self) { - self.mailbox_delivery_phase = MailboxDeliveryPhase::CurrentTurn; + self.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn); } pub(crate) fn accepts_mailbox_delivery_for_current_turn(&self) -> bool { self.mailbox_delivery_phase == MailboxDeliveryPhase::CurrentTurn } + pub(crate) fn set_mailbox_delivery_phase(&mut self, phase: MailboxDeliveryPhase) { + self.mailbox_delivery_phase = phase; + } + pub(crate) fn record_granted_permissions(&mut self, permissions: PermissionProfile) { self.granted_permissions = merge_permission_profiles(self.granted_permissions.as_ref(), Some(&permissions)); diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index 90a340958..931b8429c 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -213,6 +213,10 @@ pub(crate) async fn handle_output_item_done( match ToolRouter::build_tool_call(ctx.sess.as_ref(), item.clone()).await { // The model emitted a tool call; log it, persist the item immediately, and queue the tool execution. Ok(Some(call)) => { + ctx.sess + .accept_mailbox_delivery_for_current_turn(&ctx.turn_context.sub_id) + .await; + let payload_preview = call.payload.log_payload().into_owned(); tracing::info!( thread_id = %ctx.sess.conversation_id,