From 6c21297bba7da6ef9c14362406eec59aba5cbfce Mon Sep 17 00:00:00 2001 From: Channing Conger Date: Thu, 25 Jun 2026 15:33:58 -0700 Subject: [PATCH] [codex] add code-mode host failure supervision hooks (#30110) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why A process host should be discarded and rebuilt after critical actor or V8 failure, while the existing in-process production path must keep its current cell-error semantics. This change establishes that failure boundary without adding the host process or remote client. ## What changed - add optional task-failure supervision to the transport-neutral code-mode session runtime - report Tokio cell-actor failures and V8 runtime-thread panics to a host-provided fail-stop handler - preserve the existing handler-less in-process behavior - make host-owned cell ID allocation fail before numeric wraparound ## Follow-up The V8 panic signal surfaced here should also be consumed by the `InProcessCodeModeSession` manager in a future change so it can fail the affected cell. This PR intentionally leaves the handler-less in-process behavior unchanged while putting the required panic tracking in place. ## Stack This is **2 of 4** in the process-owned code-mode session stack. - #30108 is merged into `main` - The next PR targets this branch ## Validation - `just test -p codex-code-mode` — 53 passed - `just argument-comment-lint -p codex-code-mode` - `just fix -p codex-code-mode` --- codex-rs/Cargo.lock | 1 + codex-rs/code-mode/Cargo.toml | 1 + .../code-mode/src/cell_actor/callbacks.rs | 75 ++++++++-- .../src/cell_actor/callbacks_tests.rs | 132 ++++++++++++++++++ codex-rs/code-mode/src/cell_actor/mod.rs | 35 ++++- codex-rs/code-mode/src/cell_actor/tests.rs | 62 ++++++++ codex-rs/code-mode/src/lib.rs | 2 + codex-rs/code-mode/src/runtime/mod.rs | 65 ++++++++- codex-rs/code-mode/src/service.rs | 12 ++ codex-rs/code-mode/src/session_runtime/mod.rs | 40 ++++-- .../code-mode/src/session_runtime/tests.rs | 78 +++++++++++ .../code-mode/src/session_runtime/types.rs | 4 + 12 files changed, 482 insertions(+), 25 deletions(-) create mode 100644 codex-rs/code-mode/src/cell_actor/callbacks_tests.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 3837df62e..b14702282 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2503,6 +2503,7 @@ dependencies = [ "codex-code-mode-protocol", "codex-protocol", "deno_core_icudata", + "futures", "pretty_assertions", "serde_json", "tokio", diff --git a/codex-rs/code-mode/Cargo.toml b/codex-rs/code-mode/Cargo.toml index 09f1d7380..6b91e64fe 100644 --- a/codex-rs/code-mode/Cargo.toml +++ b/codex-rs/code-mode/Cargo.toml @@ -19,6 +19,7 @@ workspace = true codex-code-mode-protocol = { workspace = true } codex-protocol = { workspace = true } deno_core_icudata = { workspace = true } +futures = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] } tokio-util = { workspace = true, features = ["rt"] } diff --git a/codex-rs/code-mode/src/cell_actor/callbacks.rs b/codex-rs/code-mode/src/cell_actor/callbacks.rs index 9db66ed3a..08f7cbac9 100644 --- a/codex-rs/code-mode/src/cell_actor/callbacks.rs +++ b/codex-rs/code-mode/src/cell_actor/callbacks.rs @@ -1,11 +1,14 @@ +use std::panic::AssertUnwindSafe; use std::sync::Arc; +use futures::FutureExt; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::warn; use super::CellHost; use super::CellToolCall; +use crate::TaskFailureHandler; use crate::runtime::RuntimeCommand; #[derive(Clone, Copy)] @@ -20,10 +23,20 @@ pub(super) fn spawn_notification( call_id: String, text: String, cancellation_token: CancellationToken, + task_failure_handler: Option, ) { tasks.spawn(async move { - if let Err(err) = host.notify(call_id, text, cancellation_token).await { - warn!("failed to deliver code mode notification: {err}"); + let callback = + AssertUnwindSafe(async move { host.notify(call_id, text, cancellation_token).await }) + .catch_unwind() + .await; + match callback { + Ok(Ok(())) => {} + Ok(Err(err)) => warn!("failed to deliver code mode notification: {err}"), + Err(_) => report_task_failure( + task_failure_handler.as_ref(), + "code mode notification task panicked".to_string(), + ), } }); } @@ -34,14 +47,32 @@ pub(super) fn spawn_tool( invocation: CellToolCall, runtime_tx: std::sync::mpsc::Sender, cancellation_token: CancellationToken, + task_failure_handler: Option, ) { tasks.spawn(async move { let id = invocation.id.clone(); - let command = match host.invoke_tool(invocation, cancellation_token).await { - Ok(result) => RuntimeCommand::ToolResponse { id, result }, - Err(error_text) => RuntimeCommand::ToolError { id, error_text }, + let callback = + AssertUnwindSafe(async move { host.invoke_tool(invocation, cancellation_token).await }) + .catch_unwind() + .await; + let (command, failure_reason) = match callback { + Ok(Ok(result)) => (RuntimeCommand::ToolResponse { id, result }, None), + Ok(Err(error_text)) => (RuntimeCommand::ToolError { id, error_text }, None), + Err(_) => { + let failure_reason = "code mode tool task panicked".to_string(); + ( + RuntimeCommand::ToolError { + id, + error_text: failure_reason.clone(), + }, + Some(failure_reason), + ) + } }; let _ = runtime_tx.send(command); + if let Some(failure_reason) = failure_reason { + report_task_failure(task_failure_handler.as_ref(), failure_reason); + } }); } @@ -50,28 +81,48 @@ pub(super) async fn finish_callbacks( notification_tasks: &mut JoinSet<()>, tool_tasks: &mut JoinSet<()>, completion: CallbackCompletion, + task_failure_handler: Option<&TaskFailureHandler>, ) { if matches!(completion, CallbackCompletion::Cancel) { cancellation_token.cancel(); } - drain_tasks(notification_tasks, "notification").await; + drain_tasks(notification_tasks, "notification", task_failure_handler).await; cancellation_token.cancel(); - drain_tasks(tool_tasks, "tool").await; + drain_tasks(tool_tasks, "tool", task_failure_handler).await; } -pub(super) fn log_task_result( +pub(super) fn report_task_result( task_result: Option>, description: &str, + task_failure_handler: Option<&TaskFailureHandler>, ) { if let Some(Err(err)) = task_result && !err.is_cancelled() { - warn!("code mode {description} task failed: {err}"); + report_task_failure( + task_failure_handler, + format!("code mode {description} task failed: {err}"), + ); } } -async fn drain_tasks(tasks: &mut JoinSet<()>, description: &str) { - while let Some(result) = tasks.join_next().await { - log_task_result(Some(result), description); +fn report_task_failure(task_failure_handler: Option<&TaskFailureHandler>, failure_reason: String) { + warn!("{failure_reason}"); + if let Some(task_failure_handler) = task_failure_handler { + task_failure_handler(failure_reason); } } + +async fn drain_tasks( + tasks: &mut JoinSet<()>, + description: &str, + task_failure_handler: Option<&TaskFailureHandler>, +) { + while let Some(result) = tasks.join_next().await { + report_task_result(Some(result), description, task_failure_handler); + } +} + +#[cfg(test)] +#[path = "callbacks_tests.rs"] +mod tests; diff --git a/codex-rs/code-mode/src/cell_actor/callbacks_tests.rs b/codex-rs/code-mode/src/cell_actor/callbacks_tests.rs new file mode 100644 index 000000000..585f7808a --- /dev/null +++ b/codex-rs/code-mode/src/cell_actor/callbacks_tests.rs @@ -0,0 +1,132 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::mpsc as std_mpsc; +use std::time::Duration; + +use pretty_assertions::assert_eq; +use serde_json::Value as JsonValue; +use tokio::sync::mpsc; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; + +use super::*; +use crate::cell_actor::CellState; +use crate::cell_actor::CompletionCommit; +use crate::runtime::RuntimeCommand; +use crate::session_runtime::CellEvent; +use crate::session_runtime::ToolKind; +use crate::session_runtime::ToolName; + +struct PanickingCallbackHost; + +impl CellHost for PanickingCallbackHost { + async fn invoke_tool( + &self, + _invocation: CellToolCall, + _cancellation_token: CancellationToken, + ) -> Result { + panic!("tool callback panic probe"); + } + + async fn notify( + &self, + _call_id: String, + _text: String, + _cancellation_token: CancellationToken, + ) -> Result<(), String> { + panic!("notification callback panic probe"); + } + + async fn commit_completion( + &self, + _stored_value_writes: HashMap, + _event: CellEvent, + _pending_initial_yield_items: Option>, + _cell_state: Arc, + ) -> CompletionCommit { + panic!("unexpected completion commit"); + } + + async fn closed(&self) {} +} + +#[tokio::test] +async fn tool_callback_panic_rejects_the_js_promise_and_reports_failure() { + let mut tasks = JoinSet::new(); + let (runtime_tx, runtime_rx) = std_mpsc::channel(); + let (failure_tx, mut failure_rx) = mpsc::unbounded_channel(); + spawn_tool( + &mut tasks, + Arc::new(PanickingCallbackHost), + CellToolCall { + id: "tool-1".to_string(), + name: ToolName { + name: "panic".to_string(), + namespace: None, + }, + kind: ToolKind::Function, + input: None, + }, + runtime_tx, + CancellationToken::new(), + Some(Arc::new(move |reason| { + let _ = failure_tx.send(reason); + })), + ); + + tasks + .join_next() + .await + .expect("tool callback task") + .expect("tool callback wrapper"); + let command = runtime_rx + .recv_timeout(Duration::from_secs(1)) + .expect("tool error command"); + let RuntimeCommand::ToolError { id, error_text } = command else { + panic!("expected a tool error command"); + }; + assert_eq!(id, "tool-1"); + assert_eq!(error_text, "code mode tool task panicked"); + assert_eq!(failure_rx.recv().await, Some(error_text)); +} + +#[tokio::test] +async fn notification_callback_panic_reports_failure() { + let mut tasks = JoinSet::new(); + let (failure_tx, mut failure_rx) = mpsc::unbounded_channel(); + spawn_notification( + &mut tasks, + Arc::new(PanickingCallbackHost), + "notify-1".to_string(), + "hello".to_string(), + CancellationToken::new(), + Some(Arc::new(move |reason| { + let _ = failure_tx.send(reason); + })), + ); + + tasks + .join_next() + .await + .expect("notification callback task") + .expect("notification callback wrapper"); + let failure_reason = failure_rx.recv().await.expect("notification failure"); + assert_eq!(failure_reason, "code mode notification task panicked"); +} + +#[tokio::test] +async fn callback_wrapper_join_error_reports_failure() { + let task_result = tokio::spawn(async { + panic!("callback wrapper panic probe"); + }) + .await; + let (failure_tx, mut failure_rx) = mpsc::unbounded_channel(); + let task_failure_handler: TaskFailureHandler = Arc::new(move |reason| { + let _ = failure_tx.send(reason); + }); + + report_task_result(Some(task_result), "tool", Some(&task_failure_handler)); + + let failure_reason = failure_rx.recv().await.expect("wrapper failure"); + assert!(failure_reason.contains("code mode tool task failed")); +} diff --git a/codex-rs/code-mode/src/cell_actor/mod.rs b/codex-rs/code-mode/src/cell_actor/mod.rs index 24eca2251..7533f4b0c 100644 --- a/codex-rs/code-mode/src/cell_actor/mod.rs +++ b/codex-rs/code-mode/src/cell_actor/mod.rs @@ -14,7 +14,7 @@ use tokio_util::sync::CancellationToken; use self::callbacks::CallbackCompletion; use self::callbacks::finish_callbacks; -use self::callbacks::log_task_result; +use self::callbacks::report_task_result; use self::callbacks::spawn_notification; use self::callbacks::spawn_tool; use self::conversions::cell_tool_kind; @@ -30,6 +30,7 @@ pub(crate) use self::types::CellToolCall; pub(crate) use self::types::CompletionCommit; use self::types::CompletionDelivery; use self::types::ObservationDelivery; +use crate::TaskFailureHandler; use crate::runtime::PendingRuntimeMode; use crate::runtime::RuntimeCommand; use crate::runtime::RuntimeControlCommand; @@ -50,6 +51,7 @@ impl CellActor { host: Arc, initial_observe_mode: ObserveMode, cell_state: Arc, + task_failure_handler: Option, ) -> Result< ( CellHandle, @@ -66,6 +68,7 @@ impl CellActor { runtime_request(request), event_tx, PendingRuntimeMode::PauseUntilResumed, + task_failure_handler.clone(), )?; let handle = CellHandle::new(command_tx, Arc::clone(&cell_state)); let task = run_cell( @@ -82,6 +85,7 @@ impl CellActor { mode: initial_observe_mode, response_tx: initial_response_tx, }, + task_failure_handler, ); let initial_response = Box::pin(async move { initial_response_rx.await.unwrap_or(Err(CellError::Closed)) }); @@ -107,6 +111,7 @@ async fn run_cell( mut event_rx: mpsc::UnboundedReceiver, command_rx: mpsc::UnboundedReceiver, initial_observer: Observer, + task_failure_handler: Option, ) { let CellContext { runtime_tx, @@ -123,6 +128,7 @@ async fn run_cell( let mut termination = false; let mut runtime_closed = false; let mut runtime_paused = false; + let mut runtime_failure_reported = false; let mut yield_timer: Option>> = None; let mut notification_tasks = JoinSet::new(); let mut tool_tasks = JoinSet::new(); @@ -149,6 +155,7 @@ async fn run_cell( &mut notification_tasks, &mut tool_tasks, CallbackCompletion::Cancel, + task_failure_handler.as_ref(), ).await; finish_termination( &cell_state, @@ -259,6 +266,7 @@ async fn run_cell( &mut notification_tasks, &mut tool_tasks, CallbackCompletion::Cancel, + task_failure_handler.as_ref(), ).await; finish_termination( &cell_state, @@ -269,11 +277,20 @@ async fn run_cell( ); break; } + if !runtime_failure_reported + && let Some(task_failure_handler) = &task_failure_handler + { + runtime_failure_reported = true; + task_failure_handler( + "code-mode V8 runtime thread ended unexpectedly".to_string(), + ); + } finish_callbacks( &callback_cancellation_token, &mut notification_tasks, &mut tool_tasks, CallbackCompletion::DrainNotifications, + task_failure_handler.as_ref(), ) .await; let event = CellEvent::Completed { @@ -376,6 +393,7 @@ async fn run_cell( call_id, text, callback_cancellation_token.child_token(), + task_failure_handler.clone(), ); } RuntimeEvent::ToolCall { id, name, kind, input } => { @@ -394,6 +412,7 @@ async fn run_cell( }, runtime_tx.clone(), callback_cancellation_token.child_token(), + task_failure_handler.clone(), ); } RuntimeEvent::Result { stored_value_writes, error_text } => { @@ -405,6 +424,7 @@ async fn run_cell( &mut notification_tasks, &mut tool_tasks, CallbackCompletion::Cancel, + task_failure_handler.as_ref(), ).await; finish_termination( &cell_state, @@ -420,6 +440,7 @@ async fn run_cell( &mut notification_tasks, &mut tool_tasks, CallbackCompletion::DrainNotifications, + task_failure_handler.as_ref(), ) .await; let event = CellEvent::Completed { @@ -455,13 +476,20 @@ async fn run_cell( } } } + RuntimeEvent::ThreadPanicked => { + runtime_failure_reported = true; + } } } task_result = notification_tasks.join_next(), if !notification_tasks.is_empty() => { - log_task_result(task_result, "notification"); + report_task_result( + task_result, + "notification", + task_failure_handler.as_ref(), + ); } task_result = tool_tasks.join_next(), if !tool_tasks.is_empty() => { - log_task_result(task_result, "tool"); + report_task_result(task_result, "tool", task_failure_handler.as_ref()); } } } @@ -479,6 +507,7 @@ async fn run_cell( &mut notification_tasks, &mut tool_tasks, CallbackCompletion::Cancel, + task_failure_handler.as_ref(), ) .await; host.closed().await; diff --git a/codex-rs/code-mode/src/cell_actor/tests.rs b/codex-rs/code-mode/src/cell_actor/tests.rs index a4d93fc3e..3612daff9 100644 --- a/codex-rs/code-mode/src/cell_actor/tests.rs +++ b/codex-rs/code-mode/src/cell_actor/tests.rs @@ -102,6 +102,18 @@ fn spawn_cell_actor_harness(initial_observe_mode: ObserveMode) -> CellActorHarne fn spawn_cell_actor_harness_with_host( initial_observe_mode: ObserveMode, host: Arc, +) -> CellActorHarness { + spawn_cell_actor_harness_with_host_and_failure_handler( + initial_observe_mode, + host, + /*task_failure_handler*/ None, + ) +} + +fn spawn_cell_actor_harness_with_host_and_failure_handler( + initial_observe_mode: ObserveMode, + host: Arc, + task_failure_handler: Option, ) -> CellActorHarness { let (event_tx, event_rx) = mpsc::unbounded_channel(); let (command_tx, command_rx) = mpsc::unbounded_channel(); @@ -118,6 +130,7 @@ fn spawn_cell_actor_harness_with_host( }, runtime_event_tx, PendingRuntimeMode::PauseUntilResumed, + /*task_failure_handler*/ None, ) .unwrap(); let (runtime_control_tx, runtime_control_rx) = std_mpsc::channel(); @@ -137,6 +150,7 @@ fn spawn_cell_actor_harness_with_host( mode: initial_observe_mode, response_tx: initial_event_tx, }, + task_failure_handler, )); CellActorHarness { @@ -149,6 +163,54 @@ fn spawn_cell_actor_harness_with_host( } } +#[tokio::test] +async fn unexpected_runtime_thread_exit_is_reported_to_the_session_owner() { + let (failure_tx, mut failure_rx) = mpsc::unbounded_channel(); + let harness = spawn_cell_actor_harness_with_host_and_failure_handler( + ObserveMode::YieldAfter(Duration::from_secs(60)), + Arc::new(TestHost), + Some(Arc::new(move |reason| { + let _ = failure_tx.send(reason); + })), + ); + drop(harness.event_tx); + + assert_eq!( + tokio::time::timeout(Duration::from_secs(1), failure_rx.recv()) + .await + .expect("runtime failure timeout") + .expect("runtime failure"), + "code-mode V8 runtime thread ended unexpectedly" + ); + assert!( + harness + .initial_event_rx + .await + .expect("initial event") + .is_ok() + ); + harness.task.await.expect("cell task"); +} + +#[tokio::test] +async fn runtime_thread_panic_remains_a_cell_error_without_owner_supervision() { + let harness = spawn_cell_actor_harness(ObserveMode::YieldAfter(Duration::from_secs(60))); + harness + .event_tx + .send(RuntimeEvent::ThreadPanicked) + .expect("runtime panic event"); + drop(harness.event_tx); + + assert_eq!( + harness.initial_event_rx.await.expect("initial event"), + Ok(CellEvent::Completed { + content_items: Vec::new(), + error_text: Some("exec runtime ended unexpectedly".to_string()), + }) + ); + harness.task.await.expect("cell task"); +} + async fn wait_for_notification(host: &RecordingHost) { tokio::time::timeout(Duration::from_secs(1), async { while !host.notified.load(Ordering::Acquire) { diff --git a/codex-rs/code-mode/src/lib.rs b/codex-rs/code-mode/src/lib.rs index 089bb6f78..60c02e4fa 100644 --- a/codex-rs/code-mode/src/lib.rs +++ b/codex-rs/code-mode/src/lib.rs @@ -3,6 +3,8 @@ mod runtime; mod service; mod session_runtime; +pub(crate) type TaskFailureHandler = std::sync::Arc; + pub use codex_code_mode_protocol::*; pub use service::InProcessCodeModeSession; pub use service::InProcessCodeModeSessionProvider; diff --git a/codex-rs/code-mode/src/runtime/mod.rs b/codex-rs/code-mode/src/runtime/mod.rs index 42af5c0b9..0b553123f 100644 --- a/codex-rs/code-mode/src/runtime/mod.rs +++ b/codex-rs/code-mode/src/runtime/mod.rs @@ -5,6 +5,8 @@ mod timers; mod value; use std::collections::HashMap; +use std::panic::AssertUnwindSafe; +use std::panic::catch_unwind; use std::sync::OnceLock; use std::sync::mpsc as std_mpsc; use std::thread; @@ -18,6 +20,8 @@ use codex_protocol::ToolName; use serde_json::Value as JsonValue; use tokio::sync::mpsc; +use crate::TaskFailureHandler; + const EXIT_SENTINEL: &str = "__codex_code_mode_exit__"; #[derive(Debug)] @@ -63,6 +67,7 @@ pub(crate) enum RuntimeEvent { stored_value_writes: HashMap, error_text: Option, }, + ThreadPanicked, } pub(crate) fn spawn_runtime( @@ -70,6 +75,7 @@ pub(crate) fn spawn_runtime( request: ExecuteRequest, event_tx: mpsc::UnboundedSender, pending_mode: PendingRuntimeMode, + task_failure_handler: Option, ) -> Result< ( std_mpsc::Sender, @@ -96,7 +102,7 @@ pub(crate) fn spawn_runtime( stored_values, }; - thread::spawn(move || { + spawn_supervised_runtime_thread(event_tx.clone(), task_failure_handler, move || { run_runtime( config, event_tx, @@ -114,6 +120,21 @@ pub(crate) fn spawn_runtime( Ok((command_tx, control_tx, isolate_handle)) } +fn spawn_supervised_runtime_thread( + event_tx: mpsc::UnboundedSender, + task_failure_handler: Option, + runtime: impl FnOnce() + Send + 'static, +) { + thread::spawn(move || { + if catch_unwind(AssertUnwindSafe(runtime)).is_err() { + if let Some(task_failure_handler) = task_failure_handler { + task_failure_handler("code-mode V8 runtime thread panicked".to_string()); + } + let _ = event_tx.send(RuntimeEvent::ThreadPanicked); + } + }); +} + #[derive(Clone)] struct RuntimeConfig { tool_call_id: String, @@ -336,6 +357,7 @@ mod tests { use super::RuntimeControlCommand; use super::RuntimeEvent; use super::spawn_runtime; + use super::spawn_supervised_runtime_thread; use crate::FunctionCallOutputContentItem; fn execute_request(source: &str) -> ExecuteRequest { @@ -348,6 +370,45 @@ mod tests { } } + #[tokio::test] + async fn runtime_thread_panic_before_initialization_is_reported_directly() { + let (event_tx, event_rx) = mpsc::unbounded_channel(); + drop(event_rx); + let (failure_tx, mut failure_rx) = mpsc::unbounded_channel(); + spawn_supervised_runtime_thread( + event_tx, + Some(std::sync::Arc::new(move |reason| { + let _ = failure_tx.send(reason); + })), + || panic!("runtime thread panic probe"), + ); + + assert_eq!( + tokio::time::timeout(Duration::from_secs(1), failure_rx.recv()) + .await + .expect("runtime failure timeout") + .expect("runtime failure"), + "code-mode V8 runtime thread panicked" + ); + } + + #[tokio::test] + async fn runtime_thread_panic_is_forwarded_without_owner_supervision() { + let (event_tx, mut event_rx) = mpsc::unbounded_channel(); + spawn_supervised_runtime_thread( + event_tx, + /*task_failure_handler*/ None, + || panic!("runtime thread panic probe"), + ); + + assert!(matches!( + tokio::time::timeout(Duration::from_secs(1), event_rx.recv()) + .await + .expect("runtime panic event timeout"), + Some(RuntimeEvent::ThreadPanicked) + )); + } + #[tokio::test] async fn terminate_execution_stops_cpu_bound_module() { let (event_tx, mut event_rx) = mpsc::unbounded_channel(); @@ -356,6 +417,7 @@ mod tests { execute_request("while (true) {}"), event_tx, PendingRuntimeMode::Continue, + /*task_failure_handler*/ None, ) .unwrap(); @@ -398,6 +460,7 @@ await new Promise(() => {}); ), event_tx, PendingRuntimeMode::PauseUntilResumed, + /*task_failure_handler*/ None, ) .unwrap(); diff --git a/codex-rs/code-mode/src/service.rs b/codex-rs/code-mode/src/service.rs index 57400001f..8c5b94bb7 100644 --- a/codex-rs/code-mode/src/service.rs +++ b/codex-rs/code-mode/src/service.rs @@ -87,6 +87,18 @@ impl InProcessCodeModeSession { } } + pub fn with_delegate_and_task_failure_handler( + delegate: Arc, + task_failure_handler: Arc, + ) -> Self { + Self { + runtime: SessionRuntime::new_with_task_failure_handler( + Arc::new(ProtocolDelegate { delegate }), + Some(task_failure_handler), + ), + } + } + pub async fn execute(&self, request: ExecuteRequest) -> Result { let yield_time_ms = request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS); let started = self diff --git a/codex-rs/code-mode/src/session_runtime/mod.rs b/codex-rs/code-mode/src/session_runtime/mod.rs index 5688c0430..cc014230f 100644 --- a/codex-rs/code-mode/src/session_runtime/mod.rs +++ b/codex-rs/code-mode/src/session_runtime/mod.rs @@ -24,6 +24,7 @@ pub(crate) use self::types::SessionRuntimeDelegate; pub(crate) use self::types::ToolDefinition; pub(crate) use self::types::ToolKind; pub(crate) use self::types::ToolName; +use crate::TaskFailureHandler; use crate::cell_actor::CellActor; use crate::cell_actor::CellError; use crate::cell_actor::CellEventFuture; @@ -46,11 +47,19 @@ struct Inner { cell_tasks: TaskTracker, shutdown_token: CancellationToken, delegate: Arc, + task_failure_handler: Option, next_cell_id: AtomicU64, } impl SessionRuntime { pub(crate) fn new(delegate: Arc) -> Self { + Self::new_with_task_failure_handler(delegate, /*task_failure_handler*/ None) + } + + pub(crate) fn new_with_task_failure_handler( + delegate: Arc, + task_failure_handler: Option, + ) -> Self { Self { inner: Arc::new(Inner { stored_values: Mutex::new(HashMap::new()), @@ -58,6 +67,7 @@ impl SessionRuntime { cell_tasks: TaskTracker::new(), shutdown_token: CancellationToken::new(), delegate, + task_failure_handler, next_cell_id: AtomicU64::new(1), }), } @@ -71,7 +81,7 @@ impl SessionRuntime { if self.inner.shutdown_token.is_cancelled() { return Err(Error::ShuttingDown); } - let cell_id = self.allocate_cell_id(); + let cell_id = self.allocate_cell_id()?; let initial_event = self .start_cell(cell_id.clone(), request, initial_observe_mode) .await?; @@ -133,13 +143,14 @@ impl SessionRuntime { Ok(()) } - fn allocate_cell_id(&self) -> CellId { - CellId::new( - self.inner - .next_cell_id - .fetch_add(1, Ordering::Relaxed) - .to_string(), - ) + fn allocate_cell_id(&self) -> Result { + self.inner + .next_cell_id + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |next_cell_id| { + next_cell_id.checked_add(1) + }) + .map(|cell_id| CellId::new(cell_id.to_string())) + .map_err(|_| Error::CellIdSpaceExhausted) } async fn start_cell( @@ -167,10 +178,21 @@ impl SessionRuntime { host, initial_observe_mode, cell_state, + self.inner.task_failure_handler.clone(), ) .map_err(Error::Runtime)?; cells.insert(cell_id.clone(), handle); - self.inner.cell_tasks.spawn(task); + let task = self.inner.cell_tasks.spawn(task); + if let Some(task_failure_handler) = self.inner.task_failure_handler.clone() { + let failed_cell_id = cell_id.clone(); + let _failure_watcher = self.inner.cell_tasks.spawn(async move { + if let Err(err) = task.await { + task_failure_handler(format!( + "code-mode cell {failed_cell_id} task failed: {err}" + )); + } + }); + } drop(cells); Ok(map_actor_event(cell_id, initial_event)) } diff --git a/codex-rs/code-mode/src/session_runtime/tests.rs b/codex-rs/code-mode/src/session_runtime/tests.rs index 969f06a22..1d90179bb 100644 --- a/codex-rs/code-mode/src/session_runtime/tests.rs +++ b/codex-rs/code-mode/src/session_runtime/tests.rs @@ -15,6 +15,8 @@ use crate::cell_actor::CompletionCommit; struct RecordingDelegate; +struct PanickingClosedDelegate; + impl SessionRuntimeDelegate for RecordingDelegate { async fn invoke_tool( &self, @@ -37,6 +39,62 @@ impl SessionRuntimeDelegate for RecordingDelegate { fn cell_closed(&self, _cell_id: &CellId) {} } +impl SessionRuntimeDelegate for PanickingClosedDelegate { + async fn invoke_tool( + &self, + _invocation: NestedToolCall, + _cancellation_token: CancellationToken, + ) -> Result { + Ok(JsonValue::Null) + } + + async fn notify( + &self, + _call_id: String, + _cell_id: CellId, + _text: String, + _cancellation_token: CancellationToken, + ) -> Result<(), String> { + Ok(()) + } + + fn cell_closed(&self, _cell_id: &CellId) { + panic!("cell close panic probe"); + } +} + +#[tokio::test] +async fn reports_cell_actor_panics_to_the_owner() { + let (failure_tx, mut failure_rx) = tokio::sync::mpsc::unbounded_channel(); + let runtime = SessionRuntime::new_with_task_failure_handler( + Arc::new(PanickingClosedDelegate), + Some(Arc::new(move |reason| { + let _ = failure_tx.send(reason); + })), + ); + let started = runtime + .execute( + execute_request(r#"text("done");"#), + ObserveMode::YieldAfter(Duration::from_secs(1)), + ) + .await + .expect("start cell"); + assert_eq!( + started.initial_event().await, + Ok(CellEvent::Completed { + content_items: vec![OutputItem::Text { + text: "done".to_string(), + }], + error_text: None, + }) + ); + runtime.shutdown().await.expect("shutdown runtime"); + let failure = failure_rx + .try_recv() + .expect("shutdown should wait for the cell failure watcher"); + assert!(failure.contains("code-mode cell 1 task failed")); +} + #[tokio::test] async fn termination_rejects_a_waiting_store_commit_before_the_next_cell_can_load_it() { let runtime = SessionRuntime::new(Arc::new(RecordingDelegate)); @@ -118,6 +176,26 @@ fn execute_request(source: &str) -> CreateCellRequest { } } +#[tokio::test] +async fn cell_id_allocation_fails_before_wrapping() { + let runtime = SessionRuntime::new(Arc::new(RecordingDelegate)); + runtime + .inner + .next_cell_id + .store(u64::MAX, Ordering::Relaxed); + + assert_eq!( + runtime + .execute( + execute_request(r#"text("unreachable");"#), + ObserveMode::YieldAfter(Duration::from_secs(1)), + ) + .await + .err(), + Some(Error::CellIdSpaceExhausted) + ); +} + #[tokio::test] #[expect( clippy::await_holding_invalid_type, diff --git a/codex-rs/code-mode/src/session_runtime/types.rs b/codex-rs/code-mode/src/session_runtime/types.rs index b04bd0f23..a33ffa1f1 100644 --- a/codex-rs/code-mode/src/session_runtime/types.rs +++ b/codex-rs/code-mode/src/session_runtime/types.rs @@ -138,6 +138,7 @@ pub(crate) trait SessionRuntimeDelegate: Send + Sync + 'static { #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) enum Error { ShuttingDown, + CellIdSpaceExhausted, DuplicateCell(CellId), MissingCell(CellId), BusyObserver(CellId), @@ -150,6 +151,9 @@ impl fmt::Display for Error { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::ShuttingDown => formatter.write_str("code mode session is shutting down"), + Self::CellIdSpaceExhausted => { + formatter.write_str("code mode session exhausted its cell ID space") + } Self::DuplicateCell(cell_id) => write!(formatter, "exec cell {cell_id} already exists"), Self::MissingCell(cell_id) => write!(formatter, "exec cell {cell_id} not found"), Self::BusyObserver(cell_id) => {