diff --git a/codex-rs/code-mode/src/description.rs b/codex-rs/code-mode/src/description.rs index 3a87fef09..76ede634b 100644 --- a/codex-rs/code-mode/src/description.rs +++ b/codex-rs/code-mode/src/description.rs @@ -26,6 +26,8 @@ const EXEC_DESCRIPTION_TEMPLATE: &str = r#"Run JavaScript code to orchestrate/co - `store(key: string, value: any)`: stores a serializable value under a string key for later `exec` calls in the same session. - `load(key: string)`: returns the stored value for a string key, or `undefined` if it is missing. - `notify(value: string | number | boolean | undefined | null)`: immediately injects an extra `custom_tool_call_output` for the current `exec` call. Values are stringified like `text(...)`. +- `setTimeout(callback: () => void, delayMs?: number)`: schedules a callback to run later and returns a timeout id. Pending timeouts do not keep `exec` alive by themselves; await an explicit promise if you need to wait for one. +- `clearTimeout(timeoutId?: number)`: cancels a timeout created by `setTimeout`. - `ALL_TOOLS`: metadata for the enabled nested tools as `{ name, description }` entries. - `yield_control()`: yields the accumulated output to the model immediately while the script keeps running."#; const WAIT_DESCRIPTION_TEMPLATE: &str = r#"- Use `wait` only after `exec` returns `Script running with cell ID ...`. @@ -556,4 +558,11 @@ mod tests { ); assert!(description.contains("### `foo` (`foo`)")); } + + #[test] + fn exec_description_mentions_timeout_helpers() { + let description = build_exec_tool_description(&[], /*code_mode_only*/ false); + assert!(description.contains("`setTimeout(callback: () => void, delayMs?: number)`")); + assert!(description.contains("`clearTimeout(timeoutId?: number)`")); + } } diff --git a/codex-rs/code-mode/src/runtime/callbacks.rs b/codex-rs/code-mode/src/runtime/callbacks.rs index b77ae82d6..5511baca2 100644 --- a/codex-rs/code-mode/src/runtime/callbacks.rs +++ b/codex-rs/code-mode/src/runtime/callbacks.rs @@ -3,6 +3,7 @@ use crate::response::FunctionCallOutputContentItem; use super::EXIT_SENTINEL; use super::RuntimeEvent; use super::RuntimeState; +use super::timers; use super::value::json_to_v8; use super::value::normalize_output_image; use super::value::serialize_output_text; @@ -185,6 +186,35 @@ pub(super) fn notify_callback( retval.set(v8::undefined(scope).into()); } +pub(super) fn set_timeout_callback( + scope: &mut v8::PinScope<'_, '_>, + args: v8::FunctionCallbackArguments, + mut retval: v8::ReturnValue, +) { + let timeout_id = match timers::schedule_timeout(scope, args) { + Ok(timeout_id) => timeout_id, + Err(error_text) => { + throw_type_error(scope, &error_text); + return; + } + }; + + retval.set(v8::Number::new(scope, timeout_id as f64).into()); +} + +pub(super) fn clear_timeout_callback( + scope: &mut v8::PinScope<'_, '_>, + args: v8::FunctionCallbackArguments, + mut retval: v8::ReturnValue, +) { + if let Err(error_text) = timers::clear_timeout(scope, args) { + throw_type_error(scope, &error_text); + return; + } + + retval.set(v8::undefined(scope).into()); +} + pub(super) fn yield_control_callback( scope: &mut v8::PinScope<'_, '_>, _args: v8::FunctionCallbackArguments, diff --git a/codex-rs/code-mode/src/runtime/globals.rs b/codex-rs/code-mode/src/runtime/globals.rs index 371479497..2d419db90 100644 --- a/codex-rs/code-mode/src/runtime/globals.rs +++ b/codex-rs/code-mode/src/runtime/globals.rs @@ -1,8 +1,10 @@ use super::RuntimeState; +use super::callbacks::clear_timeout_callback; use super::callbacks::exit_callback; use super::callbacks::image_callback; use super::callbacks::load_callback; use super::callbacks::notify_callback; +use super::callbacks::set_timeout_callback; use super::callbacks::store_callback; use super::callbacks::text_callback; use super::callbacks::tool_callback; @@ -18,6 +20,8 @@ pub(super) fn install_globals(scope: &mut v8::PinScope<'_, '_>) -> Result<(), St let tools = build_tools_object(scope)?; let all_tools = build_all_tools_value(scope)?; + let clear_timeout = helper_function(scope, "clearTimeout", clear_timeout_callback)?; + let set_timeout = helper_function(scope, "setTimeout", set_timeout_callback)?; let text = helper_function(scope, "text", text_callback)?; let image = helper_function(scope, "image", image_callback)?; let store = helper_function(scope, "store", store_callback)?; @@ -28,6 +32,8 @@ pub(super) fn install_globals(scope: &mut v8::PinScope<'_, '_>) -> Result<(), St set_global(scope, global, "tools", tools.into())?; set_global(scope, global, "ALL_TOOLS", all_tools)?; + set_global(scope, global, "clearTimeout", clear_timeout.into())?; + set_global(scope, global, "setTimeout", set_timeout.into())?; set_global(scope, global, "text", text.into())?; set_global(scope, global, "image", image.into())?; set_global(scope, global, "store", store.into())?; diff --git a/codex-rs/code-mode/src/runtime/mod.rs b/codex-rs/code-mode/src/runtime/mod.rs index df90eda67..411f81bdd 100644 --- a/codex-rs/code-mode/src/runtime/mod.rs +++ b/codex-rs/code-mode/src/runtime/mod.rs @@ -1,6 +1,7 @@ mod callbacks; mod globals; mod module_loader; +mod timers; mod value; use std::collections::HashMap; @@ -75,6 +76,7 @@ pub(crate) enum TurnMessage { pub(crate) enum RuntimeCommand { ToolResponse { id: String, result: JsonValue }, ToolError { id: String, error_text: String }, + TimeoutFired { id: u64 }, Terminate, } @@ -103,6 +105,7 @@ pub(crate) fn spawn_runtime( event_tx: mpsc::UnboundedSender, ) -> Result<(std_mpsc::Sender, v8::IsolateHandle), String> { let (command_tx, command_rx) = std_mpsc::channel(); + let runtime_command_tx = command_tx.clone(); let (isolate_handle_tx, isolate_handle_rx) = std_mpsc::sync_channel(1); let enabled_tools = request .enabled_tools @@ -117,7 +120,13 @@ pub(crate) fn spawn_runtime( }; thread::spawn(move || { - run_runtime(config, event_tx, command_rx, isolate_handle_tx); + run_runtime( + config, + event_tx, + command_rx, + isolate_handle_tx, + runtime_command_tx, + ); }); let isolate_handle = isolate_handle_rx @@ -137,10 +146,13 @@ struct RuntimeConfig { pub(super) struct RuntimeState { event_tx: mpsc::UnboundedSender, pending_tool_calls: HashMap>, + pending_timeouts: HashMap, stored_values: HashMap, enabled_tools: Vec, next_tool_call_id: u64, + next_timeout_id: u64, tool_call_id: String, + runtime_command_tx: std_mpsc::Sender, exit_requested: bool, } @@ -168,6 +180,7 @@ fn run_runtime( event_tx: mpsc::UnboundedSender, command_rx: std_mpsc::Receiver, isolate_handle_tx: std_mpsc::SyncSender, + runtime_command_tx: std_mpsc::Sender, ) { initialize_v8(); @@ -185,10 +198,13 @@ fn run_runtime( scope.set_slot(RuntimeState { event_tx: event_tx.clone(), pending_tool_calls: HashMap::new(), + pending_timeouts: HashMap::new(), stored_values: config.stored_values, enabled_tools: config.enabled_tools, next_tool_call_id: 1, + next_timeout_id: 1, tool_call_id: config.tool_call_id, + runtime_command_tx, exit_requested: false, }); @@ -223,6 +239,7 @@ fn run_runtime( let Ok(command) = command_rx.recv() else { break; }; + match command { RuntimeCommand::Terminate => break, RuntimeCommand::ToolResponse { id, result } => { @@ -241,6 +258,12 @@ fn run_runtime( return; } } + RuntimeCommand::TimeoutFired { id } => { + if let Err(runtime_error) = timers::invoke_timeout_callback(scope, id) { + capture_scope_send_error(scope, &event_tx, Some(runtime_error)); + return; + } + } } scope.perform_microtask_checkpoint(); diff --git a/codex-rs/code-mode/src/runtime/timers.rs b/codex-rs/code-mode/src/runtime/timers.rs new file mode 100644 index 000000000..01c414cef --- /dev/null +++ b/codex-rs/code-mode/src/runtime/timers.rs @@ -0,0 +1,114 @@ +use std::thread; +use std::time::Duration; + +use super::RuntimeCommand; +use super::RuntimeState; +use super::value::value_to_error_text; + +pub(super) struct ScheduledTimeout { + callback: v8::Global, +} + +pub(super) fn schedule_timeout( + scope: &mut v8::PinScope<'_, '_>, + args: v8::FunctionCallbackArguments, +) -> Result { + let callback = args.get(0); + if !callback.is_function() { + return Err("setTimeout expects a function callback".to_string()); + } + let callback = v8::Local::::try_from(callback) + .map_err(|_| "setTimeout expects a function callback".to_string())?; + + let delay_ms = args + .get(1) + .number_value(scope) + .map(normalize_delay_ms) + .unwrap_or(0); + + let callback = v8::Global::new(scope, callback); + let state = scope + .get_slot_mut::() + .ok_or_else(|| "runtime state unavailable".to_string())?; + let timeout_id = state.next_timeout_id; + state.next_timeout_id = state.next_timeout_id.saturating_add(1); + let runtime_command_tx = state.runtime_command_tx.clone(); + state + .pending_timeouts + .insert(timeout_id, ScheduledTimeout { callback }); + thread::spawn(move || { + thread::sleep(Duration::from_millis(delay_ms)); + let _ = runtime_command_tx.send(RuntimeCommand::TimeoutFired { id: timeout_id }); + }); + + Ok(timeout_id) +} + +pub(super) fn clear_timeout( + scope: &mut v8::PinScope<'_, '_>, + args: v8::FunctionCallbackArguments, +) -> Result<(), String> { + let Some(timeout_id) = timeout_id_from_args(scope, args)? else { + return Ok(()); + }; + + let Some(state) = scope.get_slot_mut::() else { + return Err("runtime state unavailable".to_string()); + }; + state.pending_timeouts.remove(&timeout_id); + Ok(()) +} + +pub(super) fn invoke_timeout_callback( + scope: &mut v8::PinScope<'_, '_>, + timeout_id: u64, +) -> Result<(), String> { + let callback = { + let state = scope + .get_slot_mut::() + .ok_or_else(|| "runtime state unavailable".to_string())?; + state.pending_timeouts.remove(&timeout_id) + }; + let Some(callback) = callback else { + return Ok(()); + }; + + let tc = std::pin::pin!(v8::TryCatch::new(scope)); + let mut tc = tc.init(); + let callback = v8::Local::new(&tc, &callback.callback); + let receiver = v8::undefined(&tc).into(); + let _ = callback.call(&tc, receiver, &[]); + if tc.has_caught() { + return Err(tc + .exception() + .map(|exception| value_to_error_text(&mut tc, exception)) + .unwrap_or_else(|| "unknown code mode exception".to_string())); + } + + Ok(()) +} +fn timeout_id_from_args( + scope: &mut v8::PinScope<'_, '_>, + args: v8::FunctionCallbackArguments, +) -> Result, String> { + if args.length() == 0 || args.get(0).is_null_or_undefined() { + return Ok(None); + } + + let Some(timeout_id) = args.get(0).number_value(scope) else { + return Err("clearTimeout expects a numeric timeout id".to_string()); + }; + if !timeout_id.is_finite() || timeout_id <= 0.0 { + return Ok(None); + } + + Ok(Some(timeout_id.trunc().min(u64::MAX as f64) as u64)) +} + +fn normalize_delay_ms(delay_ms: f64) -> u64 { + if !delay_ms.is_finite() || delay_ms <= 0.0 { + 0 + } else { + delay_ms.trunc().min(u64::MAX as f64) as u64 + } +} diff --git a/codex-rs/core/tests/suite/code_mode.rs b/codex-rs/core/tests/suite/code_mode.rs index 37ee27dd6..9ddc169a6 100644 --- a/codex-rs/core/tests/suite/code_mode.rs +++ b/codex-rs/core/tests/suite/code_mode.rs @@ -1626,6 +1626,34 @@ text({ json: true }); Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn code_mode_can_resume_after_set_timeout() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let (_test, second_mock) = run_code_mode_turn( + &server, + "use exec to wait for a timeout", + r#" +await new Promise((resolve) => setTimeout(resolve, 10)); +text("timer done"); +"#, + /*include_apply_patch*/ false, + ) + .await?; + + let req = second_mock.single_request(); + let (output, success) = custom_tool_output_body_and_success(&req, "call-1"); + assert_ne!( + success, + Some(false), + "exec setTimeout call failed unexpectedly: {output}" + ); + assert_eq!(output, "timer done"); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn code_mode_notify_injects_additional_exec_tool_output_into_active_context() -> Result<()> { skip_if_no_network!(Ok(())); @@ -2099,6 +2127,7 @@ text(JSON.stringify(Object.getOwnPropertyNames(globalThis).sort())); "BigInt64Array", "BigUint64Array", "Boolean", + "clearTimeout", "DataView", "Date", "DisposableStack", @@ -2161,6 +2190,7 @@ text(JSON.stringify(Object.getOwnPropertyNames(globalThis).sort())); "notify", "parseFloat", "parseInt", + "setTimeout", "store", "text", "tools", @@ -2578,3 +2608,51 @@ text(JSON.stringify(load("nb"))); Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn code_mode_can_compare_elapsed_time_around_set_timeout() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let (_test, second_mock) = run_code_mode_turn( + &server, + "measure elapsed time around setTimeout", + r#" +const start_ms = Date.now(); +await new Promise((resolve) => setTimeout(resolve, 100)); +const end_ms = Date.now(); +text(JSON.stringify({ + start_ms, + end_ms, + elapsed_ms: end_ms - start_ms, + waited_long_enough: end_ms - start_ms >= 100, +})); +"#, + /*include_apply_patch*/ false, + ) + .await?; + + let second_request = second_mock.single_request(); + let (second_output, second_success) = + custom_tool_output_body_and_success(&second_request, "call-1"); + assert_ne!( + second_success, + Some(false), + "exec compare time call failed unexpectedly: {second_output}" + ); + let compared: Value = serde_json::from_str( + &custom_tool_output_last_non_empty_text(&second_request, "call-1") + .expect("exec compare time call should emit JSON"), + )?; + let elapsed_ms = compared + .get("elapsed_ms") + .and_then(Value::as_i64) + .expect("elapsed_ms should be an integer"); + assert!( + elapsed_ms >= 100, + "expected elapsed_ms >= 100, got {elapsed_ms}" + ); + assert_eq!(compared.get("waited_long_enough"), Some(&Value::Bool(true))); + + Ok(()) +}