[codex] add code-mode host failure supervision hooks (#30110)

## Why

A process host should be discarded and rebuilt after critical actor or
V8 failure, while the existing in-process production path must keep its
current cell-error semantics. This change establishes that failure
boundary without adding the host process or remote client.

## What changed

- add optional task-failure supervision to the transport-neutral
code-mode session runtime
- report Tokio cell-actor failures and V8 runtime-thread panics to a
host-provided fail-stop handler
- preserve the existing handler-less in-process behavior
- make host-owned cell ID allocation fail before numeric wraparound

## Follow-up

The V8 panic signal surfaced here should also be consumed by the
`InProcessCodeModeSession` manager in a future change so it can fail the
affected cell. This PR intentionally leaves the handler-less in-process
behavior unchanged while putting the required panic tracking in place.

## Stack

This is **2 of 4** in the process-owned code-mode session stack.

- #30108 is merged into `main`
- The next PR targets this branch

## Validation

- `just test -p codex-code-mode` — 53 passed
- `just argument-comment-lint -p codex-code-mode`
- `just fix -p codex-code-mode`
This commit is contained in:
Channing Conger
2026-06-25 15:33:58 -07:00
committed by GitHub
Unverified
parent e2746fd7e9
commit 6c21297bba
12 changed files with 482 additions and 25 deletions
+1
View File
@@ -2503,6 +2503,7 @@ dependencies = [
"codex-code-mode-protocol",
"codex-protocol",
"deno_core_icudata",
"futures",
"pretty_assertions",
"serde_json",
"tokio",
+1
View File
@@ -19,6 +19,7 @@ workspace = true
codex-code-mode-protocol = { workspace = true }
codex-protocol = { workspace = true }
deno_core_icudata = { workspace = true }
futures = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] }
tokio-util = { workspace = true, features = ["rt"] }
+63 -12
View File
@@ -1,11 +1,14 @@
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use futures::FutureExt;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use super::CellHost;
use super::CellToolCall;
use crate::TaskFailureHandler;
use crate::runtime::RuntimeCommand;
#[derive(Clone, Copy)]
@@ -20,10 +23,20 @@ pub(super) fn spawn_notification<H: CellHost>(
call_id: String,
text: String,
cancellation_token: CancellationToken,
task_failure_handler: Option<TaskFailureHandler>,
) {
tasks.spawn(async move {
if let Err(err) = host.notify(call_id, text, cancellation_token).await {
warn!("failed to deliver code mode notification: {err}");
let callback =
AssertUnwindSafe(async move { host.notify(call_id, text, cancellation_token).await })
.catch_unwind()
.await;
match callback {
Ok(Ok(())) => {}
Ok(Err(err)) => warn!("failed to deliver code mode notification: {err}"),
Err(_) => report_task_failure(
task_failure_handler.as_ref(),
"code mode notification task panicked".to_string(),
),
}
});
}
@@ -34,14 +47,32 @@ pub(super) fn spawn_tool<H: CellHost>(
invocation: CellToolCall,
runtime_tx: std::sync::mpsc::Sender<RuntimeCommand>,
cancellation_token: CancellationToken,
task_failure_handler: Option<TaskFailureHandler>,
) {
tasks.spawn(async move {
let id = invocation.id.clone();
let command = match host.invoke_tool(invocation, cancellation_token).await {
Ok(result) => RuntimeCommand::ToolResponse { id, result },
Err(error_text) => RuntimeCommand::ToolError { id, error_text },
let callback =
AssertUnwindSafe(async move { host.invoke_tool(invocation, cancellation_token).await })
.catch_unwind()
.await;
let (command, failure_reason) = match callback {
Ok(Ok(result)) => (RuntimeCommand::ToolResponse { id, result }, None),
Ok(Err(error_text)) => (RuntimeCommand::ToolError { id, error_text }, None),
Err(_) => {
let failure_reason = "code mode tool task panicked".to_string();
(
RuntimeCommand::ToolError {
id,
error_text: failure_reason.clone(),
},
Some(failure_reason),
)
}
};
let _ = runtime_tx.send(command);
if let Some(failure_reason) = failure_reason {
report_task_failure(task_failure_handler.as_ref(), failure_reason);
}
});
}
@@ -50,28 +81,48 @@ pub(super) async fn finish_callbacks(
notification_tasks: &mut JoinSet<()>,
tool_tasks: &mut JoinSet<()>,
completion: CallbackCompletion,
task_failure_handler: Option<&TaskFailureHandler>,
) {
if matches!(completion, CallbackCompletion::Cancel) {
cancellation_token.cancel();
}
drain_tasks(notification_tasks, "notification").await;
drain_tasks(notification_tasks, "notification", task_failure_handler).await;
cancellation_token.cancel();
drain_tasks(tool_tasks, "tool").await;
drain_tasks(tool_tasks, "tool", task_failure_handler).await;
}
pub(super) fn log_task_result(
pub(super) fn report_task_result(
task_result: Option<Result<(), tokio::task::JoinError>>,
description: &str,
task_failure_handler: Option<&TaskFailureHandler>,
) {
if let Some(Err(err)) = task_result
&& !err.is_cancelled()
{
warn!("code mode {description} task failed: {err}");
report_task_failure(
task_failure_handler,
format!("code mode {description} task failed: {err}"),
);
}
}
async fn drain_tasks(tasks: &mut JoinSet<()>, description: &str) {
while let Some(result) = tasks.join_next().await {
log_task_result(Some(result), description);
fn report_task_failure(task_failure_handler: Option<&TaskFailureHandler>, failure_reason: String) {
warn!("{failure_reason}");
if let Some(task_failure_handler) = task_failure_handler {
task_failure_handler(failure_reason);
}
}
async fn drain_tasks(
tasks: &mut JoinSet<()>,
description: &str,
task_failure_handler: Option<&TaskFailureHandler>,
) {
while let Some(result) = tasks.join_next().await {
report_task_result(Some(result), description, task_failure_handler);
}
}
#[cfg(test)]
#[path = "callbacks_tests.rs"]
mod tests;
@@ -0,0 +1,132 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::mpsc as std_mpsc;
use std::time::Duration;
use pretty_assertions::assert_eq;
use serde_json::Value as JsonValue;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use super::*;
use crate::cell_actor::CellState;
use crate::cell_actor::CompletionCommit;
use crate::runtime::RuntimeCommand;
use crate::session_runtime::CellEvent;
use crate::session_runtime::ToolKind;
use crate::session_runtime::ToolName;
struct PanickingCallbackHost;
impl CellHost for PanickingCallbackHost {
async fn invoke_tool(
&self,
_invocation: CellToolCall,
_cancellation_token: CancellationToken,
) -> Result<JsonValue, String> {
panic!("tool callback panic probe");
}
async fn notify(
&self,
_call_id: String,
_text: String,
_cancellation_token: CancellationToken,
) -> Result<(), String> {
panic!("notification callback panic probe");
}
async fn commit_completion(
&self,
_stored_value_writes: HashMap<String, JsonValue>,
_event: CellEvent,
_pending_initial_yield_items: Option<Vec<crate::session_runtime::OutputItem>>,
_cell_state: Arc<CellState>,
) -> CompletionCommit {
panic!("unexpected completion commit");
}
async fn closed(&self) {}
}
#[tokio::test]
async fn tool_callback_panic_rejects_the_js_promise_and_reports_failure() {
let mut tasks = JoinSet::new();
let (runtime_tx, runtime_rx) = std_mpsc::channel();
let (failure_tx, mut failure_rx) = mpsc::unbounded_channel();
spawn_tool(
&mut tasks,
Arc::new(PanickingCallbackHost),
CellToolCall {
id: "tool-1".to_string(),
name: ToolName {
name: "panic".to_string(),
namespace: None,
},
kind: ToolKind::Function,
input: None,
},
runtime_tx,
CancellationToken::new(),
Some(Arc::new(move |reason| {
let _ = failure_tx.send(reason);
})),
);
tasks
.join_next()
.await
.expect("tool callback task")
.expect("tool callback wrapper");
let command = runtime_rx
.recv_timeout(Duration::from_secs(1))
.expect("tool error command");
let RuntimeCommand::ToolError { id, error_text } = command else {
panic!("expected a tool error command");
};
assert_eq!(id, "tool-1");
assert_eq!(error_text, "code mode tool task panicked");
assert_eq!(failure_rx.recv().await, Some(error_text));
}
#[tokio::test]
async fn notification_callback_panic_reports_failure() {
let mut tasks = JoinSet::new();
let (failure_tx, mut failure_rx) = mpsc::unbounded_channel();
spawn_notification(
&mut tasks,
Arc::new(PanickingCallbackHost),
"notify-1".to_string(),
"hello".to_string(),
CancellationToken::new(),
Some(Arc::new(move |reason| {
let _ = failure_tx.send(reason);
})),
);
tasks
.join_next()
.await
.expect("notification callback task")
.expect("notification callback wrapper");
let failure_reason = failure_rx.recv().await.expect("notification failure");
assert_eq!(failure_reason, "code mode notification task panicked");
}
#[tokio::test]
async fn callback_wrapper_join_error_reports_failure() {
let task_result = tokio::spawn(async {
panic!("callback wrapper panic probe");
})
.await;
let (failure_tx, mut failure_rx) = mpsc::unbounded_channel();
let task_failure_handler: TaskFailureHandler = Arc::new(move |reason| {
let _ = failure_tx.send(reason);
});
report_task_result(Some(task_result), "tool", Some(&task_failure_handler));
let failure_reason = failure_rx.recv().await.expect("wrapper failure");
assert!(failure_reason.contains("code mode tool task failed"));
}
+32 -3
View File
@@ -14,7 +14,7 @@ use tokio_util::sync::CancellationToken;
use self::callbacks::CallbackCompletion;
use self::callbacks::finish_callbacks;
use self::callbacks::log_task_result;
use self::callbacks::report_task_result;
use self::callbacks::spawn_notification;
use self::callbacks::spawn_tool;
use self::conversions::cell_tool_kind;
@@ -30,6 +30,7 @@ pub(crate) use self::types::CellToolCall;
pub(crate) use self::types::CompletionCommit;
use self::types::CompletionDelivery;
use self::types::ObservationDelivery;
use crate::TaskFailureHandler;
use crate::runtime::PendingRuntimeMode;
use crate::runtime::RuntimeCommand;
use crate::runtime::RuntimeControlCommand;
@@ -50,6 +51,7 @@ impl CellActor {
host: Arc<H>,
initial_observe_mode: ObserveMode,
cell_state: Arc<CellState>,
task_failure_handler: Option<TaskFailureHandler>,
) -> Result<
(
CellHandle,
@@ -66,6 +68,7 @@ impl CellActor {
runtime_request(request),
event_tx,
PendingRuntimeMode::PauseUntilResumed,
task_failure_handler.clone(),
)?;
let handle = CellHandle::new(command_tx, Arc::clone(&cell_state));
let task = run_cell(
@@ -82,6 +85,7 @@ impl CellActor {
mode: initial_observe_mode,
response_tx: initial_response_tx,
},
task_failure_handler,
);
let initial_response =
Box::pin(async move { initial_response_rx.await.unwrap_or(Err(CellError::Closed)) });
@@ -107,6 +111,7 @@ async fn run_cell<H: CellHost>(
mut event_rx: mpsc::UnboundedReceiver<RuntimeEvent>,
command_rx: mpsc::UnboundedReceiver<CellCommand>,
initial_observer: Observer,
task_failure_handler: Option<TaskFailureHandler>,
) {
let CellContext {
runtime_tx,
@@ -123,6 +128,7 @@ async fn run_cell<H: CellHost>(
let mut termination = false;
let mut runtime_closed = false;
let mut runtime_paused = false;
let mut runtime_failure_reported = false;
let mut yield_timer: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
let mut notification_tasks = JoinSet::new();
let mut tool_tasks = JoinSet::new();
@@ -149,6 +155,7 @@ async fn run_cell<H: CellHost>(
&mut notification_tasks,
&mut tool_tasks,
CallbackCompletion::Cancel,
task_failure_handler.as_ref(),
).await;
finish_termination(
&cell_state,
@@ -259,6 +266,7 @@ async fn run_cell<H: CellHost>(
&mut notification_tasks,
&mut tool_tasks,
CallbackCompletion::Cancel,
task_failure_handler.as_ref(),
).await;
finish_termination(
&cell_state,
@@ -269,11 +277,20 @@ async fn run_cell<H: CellHost>(
);
break;
}
if !runtime_failure_reported
&& let Some(task_failure_handler) = &task_failure_handler
{
runtime_failure_reported = true;
task_failure_handler(
"code-mode V8 runtime thread ended unexpectedly".to_string(),
);
}
finish_callbacks(
&callback_cancellation_token,
&mut notification_tasks,
&mut tool_tasks,
CallbackCompletion::DrainNotifications,
task_failure_handler.as_ref(),
)
.await;
let event = CellEvent::Completed {
@@ -376,6 +393,7 @@ async fn run_cell<H: CellHost>(
call_id,
text,
callback_cancellation_token.child_token(),
task_failure_handler.clone(),
);
}
RuntimeEvent::ToolCall { id, name, kind, input } => {
@@ -394,6 +412,7 @@ async fn run_cell<H: CellHost>(
},
runtime_tx.clone(),
callback_cancellation_token.child_token(),
task_failure_handler.clone(),
);
}
RuntimeEvent::Result { stored_value_writes, error_text } => {
@@ -405,6 +424,7 @@ async fn run_cell<H: CellHost>(
&mut notification_tasks,
&mut tool_tasks,
CallbackCompletion::Cancel,
task_failure_handler.as_ref(),
).await;
finish_termination(
&cell_state,
@@ -420,6 +440,7 @@ async fn run_cell<H: CellHost>(
&mut notification_tasks,
&mut tool_tasks,
CallbackCompletion::DrainNotifications,
task_failure_handler.as_ref(),
)
.await;
let event = CellEvent::Completed {
@@ -455,13 +476,20 @@ async fn run_cell<H: CellHost>(
}
}
}
RuntimeEvent::ThreadPanicked => {
runtime_failure_reported = true;
}
}
}
task_result = notification_tasks.join_next(), if !notification_tasks.is_empty() => {
log_task_result(task_result, "notification");
report_task_result(
task_result,
"notification",
task_failure_handler.as_ref(),
);
}
task_result = tool_tasks.join_next(), if !tool_tasks.is_empty() => {
log_task_result(task_result, "tool");
report_task_result(task_result, "tool", task_failure_handler.as_ref());
}
}
}
@@ -479,6 +507,7 @@ async fn run_cell<H: CellHost>(
&mut notification_tasks,
&mut tool_tasks,
CallbackCompletion::Cancel,
task_failure_handler.as_ref(),
)
.await;
host.closed().await;
@@ -102,6 +102,18 @@ fn spawn_cell_actor_harness(initial_observe_mode: ObserveMode) -> CellActorHarne
fn spawn_cell_actor_harness_with_host<H: CellHost>(
initial_observe_mode: ObserveMode,
host: Arc<H>,
) -> CellActorHarness {
spawn_cell_actor_harness_with_host_and_failure_handler(
initial_observe_mode,
host,
/*task_failure_handler*/ None,
)
}
fn spawn_cell_actor_harness_with_host_and_failure_handler<H: CellHost>(
initial_observe_mode: ObserveMode,
host: Arc<H>,
task_failure_handler: Option<TaskFailureHandler>,
) -> CellActorHarness {
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (command_tx, command_rx) = mpsc::unbounded_channel();
@@ -118,6 +130,7 @@ fn spawn_cell_actor_harness_with_host<H: CellHost>(
},
runtime_event_tx,
PendingRuntimeMode::PauseUntilResumed,
/*task_failure_handler*/ None,
)
.unwrap();
let (runtime_control_tx, runtime_control_rx) = std_mpsc::channel();
@@ -137,6 +150,7 @@ fn spawn_cell_actor_harness_with_host<H: CellHost>(
mode: initial_observe_mode,
response_tx: initial_event_tx,
},
task_failure_handler,
));
CellActorHarness {
@@ -149,6 +163,54 @@ fn spawn_cell_actor_harness_with_host<H: CellHost>(
}
}
#[tokio::test]
async fn unexpected_runtime_thread_exit_is_reported_to_the_session_owner() {
let (failure_tx, mut failure_rx) = mpsc::unbounded_channel();
let harness = spawn_cell_actor_harness_with_host_and_failure_handler(
ObserveMode::YieldAfter(Duration::from_secs(60)),
Arc::new(TestHost),
Some(Arc::new(move |reason| {
let _ = failure_tx.send(reason);
})),
);
drop(harness.event_tx);
assert_eq!(
tokio::time::timeout(Duration::from_secs(1), failure_rx.recv())
.await
.expect("runtime failure timeout")
.expect("runtime failure"),
"code-mode V8 runtime thread ended unexpectedly"
);
assert!(
harness
.initial_event_rx
.await
.expect("initial event")
.is_ok()
);
harness.task.await.expect("cell task");
}
#[tokio::test]
async fn runtime_thread_panic_remains_a_cell_error_without_owner_supervision() {
let harness = spawn_cell_actor_harness(ObserveMode::YieldAfter(Duration::from_secs(60)));
harness
.event_tx
.send(RuntimeEvent::ThreadPanicked)
.expect("runtime panic event");
drop(harness.event_tx);
assert_eq!(
harness.initial_event_rx.await.expect("initial event"),
Ok(CellEvent::Completed {
content_items: Vec::new(),
error_text: Some("exec runtime ended unexpectedly".to_string()),
})
);
harness.task.await.expect("cell task");
}
async fn wait_for_notification(host: &RecordingHost) {
tokio::time::timeout(Duration::from_secs(1), async {
while !host.notified.load(Ordering::Acquire) {
+2
View File
@@ -3,6 +3,8 @@ mod runtime;
mod service;
mod session_runtime;
pub(crate) type TaskFailureHandler = std::sync::Arc<dyn Fn(String) + Send + Sync>;
pub use codex_code_mode_protocol::*;
pub use service::InProcessCodeModeSession;
pub use service::InProcessCodeModeSessionProvider;
+64 -1
View File
@@ -5,6 +5,8 @@ mod timers;
mod value;
use std::collections::HashMap;
use std::panic::AssertUnwindSafe;
use std::panic::catch_unwind;
use std::sync::OnceLock;
use std::sync::mpsc as std_mpsc;
use std::thread;
@@ -18,6 +20,8 @@ use codex_protocol::ToolName;
use serde_json::Value as JsonValue;
use tokio::sync::mpsc;
use crate::TaskFailureHandler;
const EXIT_SENTINEL: &str = "__codex_code_mode_exit__";
#[derive(Debug)]
@@ -63,6 +67,7 @@ pub(crate) enum RuntimeEvent {
stored_value_writes: HashMap<String, JsonValue>,
error_text: Option<String>,
},
ThreadPanicked,
}
pub(crate) fn spawn_runtime(
@@ -70,6 +75,7 @@ pub(crate) fn spawn_runtime(
request: ExecuteRequest,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
pending_mode: PendingRuntimeMode,
task_failure_handler: Option<TaskFailureHandler>,
) -> Result<
(
std_mpsc::Sender<RuntimeCommand>,
@@ -96,7 +102,7 @@ pub(crate) fn spawn_runtime(
stored_values,
};
thread::spawn(move || {
spawn_supervised_runtime_thread(event_tx.clone(), task_failure_handler, move || {
run_runtime(
config,
event_tx,
@@ -114,6 +120,21 @@ pub(crate) fn spawn_runtime(
Ok((command_tx, control_tx, isolate_handle))
}
fn spawn_supervised_runtime_thread(
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
task_failure_handler: Option<TaskFailureHandler>,
runtime: impl FnOnce() + Send + 'static,
) {
thread::spawn(move || {
if catch_unwind(AssertUnwindSafe(runtime)).is_err() {
if let Some(task_failure_handler) = task_failure_handler {
task_failure_handler("code-mode V8 runtime thread panicked".to_string());
}
let _ = event_tx.send(RuntimeEvent::ThreadPanicked);
}
});
}
#[derive(Clone)]
struct RuntimeConfig {
tool_call_id: String,
@@ -336,6 +357,7 @@ mod tests {
use super::RuntimeControlCommand;
use super::RuntimeEvent;
use super::spawn_runtime;
use super::spawn_supervised_runtime_thread;
use crate::FunctionCallOutputContentItem;
fn execute_request(source: &str) -> ExecuteRequest {
@@ -348,6 +370,45 @@ mod tests {
}
}
#[tokio::test]
async fn runtime_thread_panic_before_initialization_is_reported_directly() {
let (event_tx, event_rx) = mpsc::unbounded_channel();
drop(event_rx);
let (failure_tx, mut failure_rx) = mpsc::unbounded_channel();
spawn_supervised_runtime_thread(
event_tx,
Some(std::sync::Arc::new(move |reason| {
let _ = failure_tx.send(reason);
})),
|| panic!("runtime thread panic probe"),
);
assert_eq!(
tokio::time::timeout(Duration::from_secs(1), failure_rx.recv())
.await
.expect("runtime failure timeout")
.expect("runtime failure"),
"code-mode V8 runtime thread panicked"
);
}
#[tokio::test]
async fn runtime_thread_panic_is_forwarded_without_owner_supervision() {
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
spawn_supervised_runtime_thread(
event_tx,
/*task_failure_handler*/ None,
|| panic!("runtime thread panic probe"),
);
assert!(matches!(
tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
.await
.expect("runtime panic event timeout"),
Some(RuntimeEvent::ThreadPanicked)
));
}
#[tokio::test]
async fn terminate_execution_stops_cpu_bound_module() {
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
@@ -356,6 +417,7 @@ mod tests {
execute_request("while (true) {}"),
event_tx,
PendingRuntimeMode::Continue,
/*task_failure_handler*/ None,
)
.unwrap();
@@ -398,6 +460,7 @@ await new Promise(() => {});
),
event_tx,
PendingRuntimeMode::PauseUntilResumed,
/*task_failure_handler*/ None,
)
.unwrap();
+12
View File
@@ -87,6 +87,18 @@ impl InProcessCodeModeSession {
}
}
pub fn with_delegate_and_task_failure_handler(
delegate: Arc<dyn CodeModeSessionDelegate>,
task_failure_handler: Arc<dyn Fn(String) + Send + Sync>,
) -> Self {
Self {
runtime: SessionRuntime::new_with_task_failure_handler(
Arc::new(ProtocolDelegate { delegate }),
Some(task_failure_handler),
),
}
}
pub async fn execute(&self, request: ExecuteRequest) -> Result<StartedCell, String> {
let yield_time_ms = request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS);
let started = self
+31 -9
View File
@@ -24,6 +24,7 @@ pub(crate) use self::types::SessionRuntimeDelegate;
pub(crate) use self::types::ToolDefinition;
pub(crate) use self::types::ToolKind;
pub(crate) use self::types::ToolName;
use crate::TaskFailureHandler;
use crate::cell_actor::CellActor;
use crate::cell_actor::CellError;
use crate::cell_actor::CellEventFuture;
@@ -46,11 +47,19 @@ struct Inner<D: SessionRuntimeDelegate> {
cell_tasks: TaskTracker,
shutdown_token: CancellationToken,
delegate: Arc<D>,
task_failure_handler: Option<TaskFailureHandler>,
next_cell_id: AtomicU64,
}
impl<D: SessionRuntimeDelegate> SessionRuntime<D> {
pub(crate) fn new(delegate: Arc<D>) -> Self {
Self::new_with_task_failure_handler(delegate, /*task_failure_handler*/ None)
}
pub(crate) fn new_with_task_failure_handler(
delegate: Arc<D>,
task_failure_handler: Option<TaskFailureHandler>,
) -> Self {
Self {
inner: Arc::new(Inner {
stored_values: Mutex::new(HashMap::new()),
@@ -58,6 +67,7 @@ impl<D: SessionRuntimeDelegate> SessionRuntime<D> {
cell_tasks: TaskTracker::new(),
shutdown_token: CancellationToken::new(),
delegate,
task_failure_handler,
next_cell_id: AtomicU64::new(1),
}),
}
@@ -71,7 +81,7 @@ impl<D: SessionRuntimeDelegate> SessionRuntime<D> {
if self.inner.shutdown_token.is_cancelled() {
return Err(Error::ShuttingDown);
}
let cell_id = self.allocate_cell_id();
let cell_id = self.allocate_cell_id()?;
let initial_event = self
.start_cell(cell_id.clone(), request, initial_observe_mode)
.await?;
@@ -133,13 +143,14 @@ impl<D: SessionRuntimeDelegate> SessionRuntime<D> {
Ok(())
}
fn allocate_cell_id(&self) -> CellId {
CellId::new(
self.inner
.next_cell_id
.fetch_add(1, Ordering::Relaxed)
.to_string(),
)
fn allocate_cell_id(&self) -> Result<CellId, Error> {
self.inner
.next_cell_id
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |next_cell_id| {
next_cell_id.checked_add(1)
})
.map(|cell_id| CellId::new(cell_id.to_string()))
.map_err(|_| Error::CellIdSpaceExhausted)
}
async fn start_cell(
@@ -167,10 +178,21 @@ impl<D: SessionRuntimeDelegate> SessionRuntime<D> {
host,
initial_observe_mode,
cell_state,
self.inner.task_failure_handler.clone(),
)
.map_err(Error::Runtime)?;
cells.insert(cell_id.clone(), handle);
self.inner.cell_tasks.spawn(task);
let task = self.inner.cell_tasks.spawn(task);
if let Some(task_failure_handler) = self.inner.task_failure_handler.clone() {
let failed_cell_id = cell_id.clone();
let _failure_watcher = self.inner.cell_tasks.spawn(async move {
if let Err(err) = task.await {
task_failure_handler(format!(
"code-mode cell {failed_cell_id} task failed: {err}"
));
}
});
}
drop(cells);
Ok(map_actor_event(cell_id, initial_event))
}
@@ -15,6 +15,8 @@ use crate::cell_actor::CompletionCommit;
struct RecordingDelegate;
struct PanickingClosedDelegate;
impl SessionRuntimeDelegate for RecordingDelegate {
async fn invoke_tool(
&self,
@@ -37,6 +39,62 @@ impl SessionRuntimeDelegate for RecordingDelegate {
fn cell_closed(&self, _cell_id: &CellId) {}
}
impl SessionRuntimeDelegate for PanickingClosedDelegate {
async fn invoke_tool(
&self,
_invocation: NestedToolCall,
_cancellation_token: CancellationToken,
) -> Result<JsonValue, String> {
Ok(JsonValue::Null)
}
async fn notify(
&self,
_call_id: String,
_cell_id: CellId,
_text: String,
_cancellation_token: CancellationToken,
) -> Result<(), String> {
Ok(())
}
fn cell_closed(&self, _cell_id: &CellId) {
panic!("cell close panic probe");
}
}
#[tokio::test]
async fn reports_cell_actor_panics_to_the_owner() {
let (failure_tx, mut failure_rx) = tokio::sync::mpsc::unbounded_channel();
let runtime = SessionRuntime::new_with_task_failure_handler(
Arc::new(PanickingClosedDelegate),
Some(Arc::new(move |reason| {
let _ = failure_tx.send(reason);
})),
);
let started = runtime
.execute(
execute_request(r#"text("done");"#),
ObserveMode::YieldAfter(Duration::from_secs(1)),
)
.await
.expect("start cell");
assert_eq!(
started.initial_event().await,
Ok(CellEvent::Completed {
content_items: vec![OutputItem::Text {
text: "done".to_string(),
}],
error_text: None,
})
);
runtime.shutdown().await.expect("shutdown runtime");
let failure = failure_rx
.try_recv()
.expect("shutdown should wait for the cell failure watcher");
assert!(failure.contains("code-mode cell 1 task failed"));
}
#[tokio::test]
async fn termination_rejects_a_waiting_store_commit_before_the_next_cell_can_load_it() {
let runtime = SessionRuntime::new(Arc::new(RecordingDelegate));
@@ -118,6 +176,26 @@ fn execute_request(source: &str) -> CreateCellRequest {
}
}
#[tokio::test]
async fn cell_id_allocation_fails_before_wrapping() {
let runtime = SessionRuntime::new(Arc::new(RecordingDelegate));
runtime
.inner
.next_cell_id
.store(u64::MAX, Ordering::Relaxed);
assert_eq!(
runtime
.execute(
execute_request(r#"text("unreachable");"#),
ObserveMode::YieldAfter(Duration::from_secs(1)),
)
.await
.err(),
Some(Error::CellIdSpaceExhausted)
);
}
#[tokio::test]
#[expect(
clippy::await_holding_invalid_type,
@@ -138,6 +138,7 @@ pub(crate) trait SessionRuntimeDelegate: Send + Sync + 'static {
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) enum Error {
ShuttingDown,
CellIdSpaceExhausted,
DuplicateCell(CellId),
MissingCell(CellId),
BusyObserver(CellId),
@@ -150,6 +151,9 @@ impl fmt::Display for Error {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ShuttingDown => formatter.write_str("code mode session is shutting down"),
Self::CellIdSpaceExhausted => {
formatter.write_str("code mode session exhausted its cell ID space")
}
Self::DuplicateCell(cell_id) => write!(formatter, "exec cell {cell_id} already exists"),
Self::MissingCell(cell_id) => write!(formatter, "exec cell {cell_id} not found"),
Self::BusyObserver(cell_id) => {