fix: races in end of turn (#16566)

This commit is contained in:
jif-oai
2026-04-02 15:55:55 +02:00
committed by GitHub
Unverified
parent bd50496411
commit e47ed5e57f
5 changed files with 72 additions and 6 deletions
+12 -4
View File
@@ -293,6 +293,7 @@ use crate::shell_snapshot::ShellSnapshot;
use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::state::ActiveTurn;
use crate::state::MailboxDeliveryPhase;
use crate::state::SessionServices;
use crate::state::SessionState;
use crate::tasks::GhostSnapshotTask;
@@ -4046,6 +4047,16 @@ impl Session {
}
pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) {
self.set_mailbox_delivery_phase(sub_id, MailboxDeliveryPhase::NextTurn)
.await;
}
pub(crate) async fn accept_mailbox_delivery_for_current_turn(&self, sub_id: &str) {
self.set_mailbox_delivery_phase(sub_id, MailboxDeliveryPhase::CurrentTurn)
.await;
}
async fn set_mailbox_delivery_phase(&self, sub_id: &str, phase: MailboxDeliveryPhase) {
let turn_state = {
let active = self.active_turn.lock().await;
active.as_ref().and_then(|active_turn| {
@@ -4058,10 +4069,7 @@ impl Session {
let Some(turn_state) = turn_state else {
return;
};
turn_state
.lock()
.await
.defer_mailbox_delivery_to_next_turn();
turn_state.lock().await.set_mailbox_delivery_phase(phase);
}
pub(crate) fn subscribe_mailbox_seq(&self) -> watch::Receiver<u64> {
+49
View File
@@ -4896,6 +4896,55 @@ async fn steered_input_reopens_mailbox_delivery_for_current_turn() {
);
}
#[tokio::test]
async fn tool_calls_reopen_mailbox_delivery_for_current_turn() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
let communication = InterAgentCommunication::new(
AgentPath::try_from("/root/worker").expect("worker path should parse"),
AgentPath::root(),
Vec::new(),
"queued child update".to_string(),
/*trigger_turn*/ false,
);
sess.spawn_task(
Arc::clone(&tc),
Vec::new(),
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: true,
},
)
.await;
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await;
sess.enqueue_mailbox_communication(communication.clone());
let item = ResponseItem::FunctionCall {
id: None,
name: "test_tool".to_string(),
namespace: None,
arguments: "{}".to_string(),
call_id: "call-1".to_string(),
};
let mut ctx = HandleOutputCtx {
sess: Arc::clone(&sess),
turn_context: Arc::clone(&tc),
tool_runtime: test_tool_runtime(Arc::clone(&sess), Arc::clone(&tc)),
cancellation_token: CancellationToken::new(),
};
let output = handle_output_item_done(&mut ctx, item, /*previously_active_item*/ None)
.await
.expect("tool call should be handled");
assert!(output.needs_follow_up);
assert!(output.tool_future.is_some());
assert_eq!(
sess.get_pending_input().await,
vec![communication.to_response_input_item()],
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
+1
View File
@@ -5,5 +5,6 @@ mod turn;
pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::MailboxDeliveryPhase;
pub(crate) use turn::RunningTask;
pub(crate) use turn::TaskKind;
+6 -2
View File
@@ -215,17 +215,21 @@ impl TurnState {
}
pub(crate) fn defer_mailbox_delivery_to_next_turn(&mut self) {
self.mailbox_delivery_phase = MailboxDeliveryPhase::NextTurn;
self.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn);
}
pub(crate) fn accept_mailbox_delivery_for_current_turn(&mut self) {
self.mailbox_delivery_phase = MailboxDeliveryPhase::CurrentTurn;
self.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn);
}
pub(crate) fn accepts_mailbox_delivery_for_current_turn(&self) -> bool {
self.mailbox_delivery_phase == MailboxDeliveryPhase::CurrentTurn
}
pub(crate) fn set_mailbox_delivery_phase(&mut self, phase: MailboxDeliveryPhase) {
self.mailbox_delivery_phase = phase;
}
pub(crate) fn record_granted_permissions(&mut self, permissions: PermissionProfile) {
self.granted_permissions =
merge_permission_profiles(self.granted_permissions.as_ref(), Some(&permissions));
+4
View File
@@ -213,6 +213,10 @@ pub(crate) async fn handle_output_item_done(
match ToolRouter::build_tool_call(ctx.sess.as_ref(), item.clone()).await {
// The model emitted a tool call; log it, persist the item immediately, and queue the tool execution.
Ok(Some(call)) => {
ctx.sess
.accept_mailbox_delivery_for_current_turn(&ctx.turn_context.sub_id)
.await;
let payload_preview = call.payload.log_payload().into_owned();
tracing::info!(
thread_id = %ctx.sess.conversation_id,