diff --git a/codex-rs/app-server/src/current_time.rs b/codex-rs/app-server/src/current_time.rs index 9b76d337a..205068be2 100644 --- a/codex-rs/app-server/src/current_time.rs +++ b/codex-rs/app-server/src/current_time.rs @@ -10,6 +10,7 @@ use chrono::Utc; use codex_app_server_protocol::CurrentTimeReadParams; use codex_app_server_protocol::CurrentTimeReadResponse; use codex_app_server_protocol::ServerRequestPayload; +use codex_core::SleepFuture; use codex_core::TimeFuture; use codex_core::TimeProvider; use codex_protocol::ThreadId; @@ -49,6 +50,13 @@ impl TimeProvider for AppServerTimeProvider { request_current_time(outgoing, thread_state_manager, thread_id).await }) } + + fn sleep(&self, _thread_id: ThreadId, duration: Duration) -> SleepFuture<'_> { + Box::pin(async move { + tokio::time::sleep(duration).await; + Ok(()) + }) + } } async fn request_current_time( diff --git a/codex-rs/core/src/current_time.rs b/codex-rs/core/src/current_time.rs index 7740552e1..b2c3fadd7 100644 --- a/codex-rs/core/src/current_time.rs +++ b/codex-rs/core/src/current_time.rs @@ -1,6 +1,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use anyhow::Result; use anyhow::anyhow; @@ -12,10 +13,16 @@ use codex_protocol::ThreadId; use crate::config::CurrentTimeReminderConfig; pub type TimeFuture<'a> = Pin>> + Send + 'a>>; +pub type SleepFuture<'a> = Pin> + Send + 'a>>; -/// Host integration boundary for obtaining the current time. +/// Host integration boundary for reading and waiting on the current time. pub trait TimeProvider: Send + Sync { fn current_time(&self, thread_id: ThreadId) -> TimeFuture<'_>; + + /// Waits for the given duration on this provider's clock. + /// + /// Dropping the returned future cancels the wait. + fn sleep(&self, thread_id: ThreadId, duration: Duration) -> SleepFuture<'_>; } pub(crate) struct SystemTimeProvider; @@ -24,6 +31,13 @@ impl TimeProvider for SystemTimeProvider { fn current_time(&self, _thread_id: ThreadId) -> TimeFuture<'_> { Box::pin(async { Ok(Utc::now()) }) } + + fn sleep(&self, _thread_id: ThreadId, duration: Duration) -> SleepFuture<'_> { + Box::pin(async move { + tokio::time::sleep(duration).await; + Ok(()) + }) + } } pub(crate) fn resolve_time_provider( diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index da9195406..64148a1d4 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -189,6 +189,7 @@ pub use client_common::ResponseEvent; pub use client_common::ResponseStream; pub use codex_prompts::REVIEW_PROMPT; pub use compact::content_items_to_text; +pub use current_time::SleepFuture; pub use current_time::TimeFuture; pub use current_time::TimeProvider; pub use event_mapping::parse_turn_item; diff --git a/codex-rs/core/src/tools/handlers/sleep.rs b/codex-rs/core/src/tools/handlers/sleep.rs index 51888227a..acb45ca90 100644 --- a/codex-rs/core/src/tools/handlers/sleep.rs +++ b/codex-rs/core/src/tools/handlers/sleep.rs @@ -21,7 +21,7 @@ use std::time::Instant; const NAMESPACE: &str = "clock"; const TOOL_NAME: &str = "sleep"; -const MAX_SLEEP_DURATION_MS: u64 = 3_600_000; +const MAX_SLEEP_DURATION_MS: u64 = 12 * 60 * 60 * 1000; pub struct SleepHandler; @@ -102,24 +102,36 @@ impl ToolExecutor for SleepHandler { .input_queue .subscribe_activity(turn_state.as_deref()) .await; - let interrupted = if pending_activity.is_some() { - true + let sleep_result: Result = if pending_activity.is_some() { + Ok(true) } else { - let sleep = tokio::time::sleep(Duration::from_millis(args.duration_ms)); + let sleep = session + .services + .time_provider + .sleep(session.thread_id, Duration::from_millis(args.duration_ms)); tokio::pin!(sleep); tokio::select! { - () = &mut sleep => false, + result = &mut sleep => result + .map(|()| false) + .map_err(|err| { + FunctionCallError::Fatal(format!("failed to sleep: {err:#}")) + }), result = activity_rx.changed() => { if result.is_ok() { - true + Ok(true) } else { - sleep.await; - false + sleep + .await + .map(|()| false) + .map_err(|err| { + FunctionCallError::Fatal(format!("failed to sleep: {err:#}")) + }) } } } }; session.emit_turn_item_completed(turn.as_ref(), item).await; + let interrupted = sleep_result?; let message = if interrupted { "Sleep interrupted by new input." diff --git a/codex-rs/core/tests/suite/current_time_reminder.rs b/codex-rs/core/tests/suite/current_time_reminder.rs index 22634fd8e..8ebf5d6b1 100644 --- a/codex-rs/core/tests/suite/current_time_reminder.rs +++ b/codex-rs/core/tests/suite/current_time_reminder.rs @@ -1,11 +1,14 @@ use std::sync::Arc; use std::sync::atomic::AtomicI64; +use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; +use std::time::Duration; use anyhow::Result; use anyhow::anyhow; use chrono::DateTime; use chrono::Utc; +use codex_core::SleepFuture; use codex_core::TimeFuture; use codex_core::TimeProvider; use codex_core::config::CurrentTimeReminderConfig; @@ -40,22 +43,34 @@ const SECOND_REMINDER: &str = "It is 2026-06-17 17:35:15 UTC."; const THIRD_REMINDER: &str = "It is 2026-06-17 17:36:15 UTC."; const FIRST_TIME_UNIX_SECONDS: i64 = 1_781_717_655; -struct TestTimeProvider(AtomicI64); +struct TestTimeProvider { + current_time: AtomicI64, + sleep_seconds: AtomicU64, +} impl Default for TestTimeProvider { fn default() -> Self { - Self(AtomicI64::new(FIRST_TIME_UNIX_SECONDS)) + Self { + current_time: AtomicI64::new(FIRST_TIME_UNIX_SECONDS), + sleep_seconds: AtomicU64::new(0), + } } } impl TimeProvider for TestTimeProvider { fn current_time(&self, _thread_id: ThreadId) -> TimeFuture<'_> { - let timestamp = self.0.fetch_add(60, Ordering::Relaxed); + let timestamp = self.current_time.fetch_add(60, Ordering::Relaxed); Box::pin(async move { Ok(DateTime::::from_timestamp(timestamp, 0) .expect("test timestamp should be valid")) }) } + + fn sleep(&self, _thread_id: ThreadId, duration: Duration) -> SleepFuture<'_> { + self.sleep_seconds + .store(duration.as_secs(), Ordering::Relaxed); + Box::pin(async { Ok(()) }) + } } struct FailingTimeProvider; @@ -64,6 +79,10 @@ impl TimeProvider for FailingTimeProvider { fn current_time(&self, _thread_id: ThreadId) -> TimeFuture<'_> { Box::pin(async { Err(anyhow!("test clock unavailable")) }) } + + fn sleep(&self, _thread_id: ThreadId, _duration: Duration) -> SleepFuture<'_> { + Box::pin(async { Err(anyhow!("test clock unavailable")) }) + } } fn current_time_reminders(request: &ResponsesRequest) -> Vec { @@ -325,3 +344,67 @@ async fn current_time_tool_returns_the_latest_time() -> Result<()> { Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn sleep_tool_uses_configured_time_provider() -> Result<()> { + skip_if_no_network!(Ok(())); + + const CALL_ID: &str = "sleep"; + const DURATION_MS: u64 = 12 * 60 * 60 * 1000; + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call_with_namespace( + CALL_ID, + "clock", + "sleep", + &json!({ "duration_ms": DURATION_MS }).to_string(), + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-2", "done"), + ev_completed("resp-2"), + ]), + ], + ) + .await; + let time_provider = Arc::new(TestTimeProvider::default()); + let test = test_codex() + .with_config(|config| { + enable_current_time_reminder( + config, + /*interval*/ 3_000, + CurrentTimeSource::External, + ); + config + .current_time_reminder + .as_mut() + .expect("current-time reminder config should be present") + .sleep_tool = true; + }) + .with_external_time_provider(time_provider.clone()) + .build(&server) + .await?; + + test.submit_turn("sleep").await?; + + assert_eq!( + time_provider.sleep_seconds.load(Ordering::Relaxed), + DURATION_MS / 1_000 + ); + let requests = responses.requests(); + assert_eq!(requests.len(), 2); + assert!( + requests[1] + .function_call_output_text(CALL_ID) + .is_some_and(|output| output.ends_with("Sleep completed.")) + ); + + Ok(()) +}