[codex] route sleep through time providers (#29973)

## Summary

- add a cancellable sleep operation to `TimeProvider`
- route `clock.sleep` through the configured provider
- extend the supported sleep duration to 12 hours
- complete the sleep turn item before propagating provider failures

## Why

This isolates the core clock abstraction needed by external clock
integrations. Existing system and app-server behavior remains wall-clock
based in this PR; the stacked follow-up supplies app-server sleeps from
an external clock.
This commit is contained in:
rka-oai
2026-06-24 22:17:43 -07:00
committed by GitHub
Unverified
parent 22f12568e1
commit f66d793a2d
5 changed files with 130 additions and 12 deletions
+8
View File
@@ -10,6 +10,7 @@ use chrono::Utc;
use codex_app_server_protocol::CurrentTimeReadParams;
use codex_app_server_protocol::CurrentTimeReadResponse;
use codex_app_server_protocol::ServerRequestPayload;
use codex_core::SleepFuture;
use codex_core::TimeFuture;
use codex_core::TimeProvider;
use codex_protocol::ThreadId;
@@ -49,6 +50,13 @@ impl TimeProvider for AppServerTimeProvider {
request_current_time(outgoing, thread_state_manager, thread_id).await
})
}
fn sleep(&self, _thread_id: ThreadId, duration: Duration) -> SleepFuture<'_> {
Box::pin(async move {
tokio::time::sleep(duration).await;
Ok(())
})
}
}
async fn request_current_time(
+15 -1
View File
@@ -1,6 +1,7 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use anyhow::anyhow;
@@ -12,10 +13,16 @@ use codex_protocol::ThreadId;
use crate::config::CurrentTimeReminderConfig;
pub type TimeFuture<'a> = Pin<Box<dyn Future<Output = Result<DateTime<Utc>>> + Send + 'a>>;
pub type SleepFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
/// Host integration boundary for obtaining the current time.
/// Host integration boundary for reading and waiting on the current time.
pub trait TimeProvider: Send + Sync {
fn current_time(&self, thread_id: ThreadId) -> TimeFuture<'_>;
/// Waits for the given duration on this provider's clock.
///
/// Dropping the returned future cancels the wait.
fn sleep(&self, thread_id: ThreadId, duration: Duration) -> SleepFuture<'_>;
}
pub(crate) struct SystemTimeProvider;
@@ -24,6 +31,13 @@ impl TimeProvider for SystemTimeProvider {
fn current_time(&self, _thread_id: ThreadId) -> TimeFuture<'_> {
Box::pin(async { Ok(Utc::now()) })
}
fn sleep(&self, _thread_id: ThreadId, duration: Duration) -> SleepFuture<'_> {
Box::pin(async move {
tokio::time::sleep(duration).await;
Ok(())
})
}
}
pub(crate) fn resolve_time_provider(
+1
View File
@@ -189,6 +189,7 @@ pub use client_common::ResponseEvent;
pub use client_common::ResponseStream;
pub use codex_prompts::REVIEW_PROMPT;
pub use compact::content_items_to_text;
pub use current_time::SleepFuture;
pub use current_time::TimeFuture;
pub use current_time::TimeProvider;
pub use event_mapping::parse_turn_item;
+20 -8
View File
@@ -21,7 +21,7 @@ use std::time::Instant;
const NAMESPACE: &str = "clock";
const TOOL_NAME: &str = "sleep";
const MAX_SLEEP_DURATION_MS: u64 = 3_600_000;
const MAX_SLEEP_DURATION_MS: u64 = 12 * 60 * 60 * 1000;
pub struct SleepHandler;
@@ -102,24 +102,36 @@ impl ToolExecutor<ToolInvocation> for SleepHandler {
.input_queue
.subscribe_activity(turn_state.as_deref())
.await;
let interrupted = if pending_activity.is_some() {
true
let sleep_result: Result<bool, FunctionCallError> = if pending_activity.is_some() {
Ok(true)
} else {
let sleep = tokio::time::sleep(Duration::from_millis(args.duration_ms));
let sleep = session
.services
.time_provider
.sleep(session.thread_id, Duration::from_millis(args.duration_ms));
tokio::pin!(sleep);
tokio::select! {
() = &mut sleep => false,
result = &mut sleep => result
.map(|()| false)
.map_err(|err| {
FunctionCallError::Fatal(format!("failed to sleep: {err:#}"))
}),
result = activity_rx.changed() => {
if result.is_ok() {
true
Ok(true)
} else {
sleep.await;
false
sleep
.await
.map(|()| false)
.map_err(|err| {
FunctionCallError::Fatal(format!("failed to sleep: {err:#}"))
})
}
}
}
};
session.emit_turn_item_completed(turn.as_ref(), item).await;
let interrupted = sleep_result?;
let message = if interrupted {
"Sleep interrupted by new input."
@@ -1,11 +1,14 @@
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use anyhow::Result;
use anyhow::anyhow;
use chrono::DateTime;
use chrono::Utc;
use codex_core::SleepFuture;
use codex_core::TimeFuture;
use codex_core::TimeProvider;
use codex_core::config::CurrentTimeReminderConfig;
@@ -40,22 +43,34 @@ const SECOND_REMINDER: &str = "It is 2026-06-17 17:35:15 UTC.";
const THIRD_REMINDER: &str = "It is 2026-06-17 17:36:15 UTC.";
const FIRST_TIME_UNIX_SECONDS: i64 = 1_781_717_655;
struct TestTimeProvider(AtomicI64);
struct TestTimeProvider {
current_time: AtomicI64,
sleep_seconds: AtomicU64,
}
impl Default for TestTimeProvider {
fn default() -> Self {
Self(AtomicI64::new(FIRST_TIME_UNIX_SECONDS))
Self {
current_time: AtomicI64::new(FIRST_TIME_UNIX_SECONDS),
sleep_seconds: AtomicU64::new(0),
}
}
}
impl TimeProvider for TestTimeProvider {
fn current_time(&self, _thread_id: ThreadId) -> TimeFuture<'_> {
let timestamp = self.0.fetch_add(60, Ordering::Relaxed);
let timestamp = self.current_time.fetch_add(60, Ordering::Relaxed);
Box::pin(async move {
Ok(DateTime::<Utc>::from_timestamp(timestamp, 0)
.expect("test timestamp should be valid"))
})
}
fn sleep(&self, _thread_id: ThreadId, duration: Duration) -> SleepFuture<'_> {
self.sleep_seconds
.store(duration.as_secs(), Ordering::Relaxed);
Box::pin(async { Ok(()) })
}
}
struct FailingTimeProvider;
@@ -64,6 +79,10 @@ impl TimeProvider for FailingTimeProvider {
fn current_time(&self, _thread_id: ThreadId) -> TimeFuture<'_> {
Box::pin(async { Err(anyhow!("test clock unavailable")) })
}
fn sleep(&self, _thread_id: ThreadId, _duration: Duration) -> SleepFuture<'_> {
Box::pin(async { Err(anyhow!("test clock unavailable")) })
}
}
fn current_time_reminders(request: &ResponsesRequest) -> Vec<String> {
@@ -325,3 +344,67 @@ async fn current_time_tool_returns_the_latest_time() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sleep_tool_uses_configured_time_provider() -> Result<()> {
skip_if_no_network!(Ok(()));
const CALL_ID: &str = "sleep";
const DURATION_MS: u64 = 12 * 60 * 60 * 1000;
let server = start_mock_server().await;
let responses = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call_with_namespace(
CALL_ID,
"clock",
"sleep",
&json!({ "duration_ms": DURATION_MS }).to_string(),
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let time_provider = Arc::new(TestTimeProvider::default());
let test = test_codex()
.with_config(|config| {
enable_current_time_reminder(
config,
/*interval*/ 3_000,
CurrentTimeSource::External,
);
config
.current_time_reminder
.as_mut()
.expect("current-time reminder config should be present")
.sleep_tool = true;
})
.with_external_time_provider(time_provider.clone())
.build(&server)
.await?;
test.submit_turn("sleep").await?;
assert_eq!(
time_provider.sleep_seconds.load(Ordering::Relaxed),
DURATION_MS / 1_000
);
let requests = responses.requests();
assert_eq!(requests.len(), 2);
assert!(
requests[1]
.function_call_output_text(CALL_ID)
.is_some_and(|output| output.ends_with("Sleep completed."))
);
Ok(())
}