[codex] Add interruptible sleep tool (#28429)

## Why

Models sometimes need to pause briefly while waiting for external work,
but using a shell command for that delay ties the wait to a process and
does not naturally resume when new turn input arrives.

## What changed

- add a built-in `sleep` tool behind the under-development `sleep_tool`
feature
- accept a bounded `duration_ms` argument, matching the millisecond
convention used by unified exec
- end the sleep early when either steered user input or mailbox input
arrives
- include elapsed wall-clock time in completed and interrupted outputs
- emit a dedicated core `SleepItem` through `item/started` and
`item/completed`
- expose the sleep item as app-server v2 `ThreadItem::Sleep` and retain
it in reconstructed thread history
- regenerate the configuration schema for the new feature flag
- regenerate app-server JSON and TypeScript schema fixtures

## Test plan

- `just test -p codex-core sleep_tool_follows_feature_gate`
- `just test -p codex-core any_new_input_interrupts_sleep`
- `just test -p codex-app-server-protocol`
- `just test -p codex-app-server
sleep_emits_started_and_completed_items`
This commit is contained in:
pakrym-oai
2026-06-15 21:39:21 -07:00
committed by GitHub
Unverified
parent 022f1221e8
commit 08901fc8e1
37 changed files with 1099 additions and 19 deletions
+2
View File
@@ -384,6 +384,7 @@ impl TurnToolCounts {
| ThreadItem::Plan { .. }
| ThreadItem::Reasoning { .. }
| ThreadItem::ImageView { .. }
| ThreadItem::Sleep { .. }
| ThreadItem::EnteredReviewMode { .. }
| ThreadItem::ExitedReviewMode { .. }
| ThreadItem::ContextCompaction { .. } => return,
@@ -1620,6 +1621,7 @@ fn tracked_tool_item_id(item: &ThreadItem) -> Option<&str> {
| ThreadItem::Reasoning { .. }
| ThreadItem::SubAgentActivity { .. }
| ThreadItem::ImageView { .. }
| ThreadItem::Sleep { .. }
| ThreadItem::EnteredReviewMode { .. }
| ThreadItem::ExitedReviewMode { .. }
| ThreadItem::ContextCompaction { .. } => None,
@@ -4340,6 +4340,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -17662,6 +17662,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -15470,6 +15470,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1087,6 +1087,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1087,6 +1087,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1231,6 +1231,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1715,6 +1715,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1530,6 +1530,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1530,6 +1530,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1530,6 +1530,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1715,6 +1715,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1530,6 +1530,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1715,6 +1715,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1530,6 +1530,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1530,6 +1530,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1231,6 +1231,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1231,6 +1231,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -1231,6 +1231,32 @@
"title": "ImageViewThreadItem",
"type": "object"
},
{
"properties": {
"durationMs": {
"format": "uint64",
"minimum": 0.0,
"type": "integer"
},
"id": {
"type": "string"
},
"type": {
"enum": [
"sleep"
],
"title": "SleepThreadItemType",
"type": "string"
}
},
"required": [
"durationMs",
"id",
"type"
],
"title": "SleepThreadItem",
"type": "object"
},
{
"properties": {
"id": {
@@ -99,4 +99,4 @@ reasoningEffort: ReasoningEffort | null,
/**
* Last known status of the target agents, when available.
*/
agentsStates: { [key in string]?: CollabAgentState }, } | { "type": "subAgentActivity", id: string, kind: SubAgentActivityKind, agentThreadId: string, agentPath: string, } | { "type": "webSearch", id: string, query: string, action: WebSearchAction | null, } | { "type": "imageView", id: string, path: AbsolutePathBuf, } | { "type": "imageGeneration", id: string, status: string, revisedPrompt: string | null, result: string, savedPath?: AbsolutePathBuf, } | { "type": "enteredReviewMode", id: string, review: string, } | { "type": "exitedReviewMode", id: string, review: string, } | { "type": "contextCompaction", id: string, };
agentsStates: { [key in string]?: CollabAgentState }, } | { "type": "subAgentActivity", id: string, kind: SubAgentActivityKind, agentThreadId: string, agentPath: string, } | { "type": "webSearch", id: string, query: string, action: WebSearchAction | null, } | { "type": "imageView", id: string, path: AbsolutePathBuf, } | { "type": "sleep", id: string, durationMs: number, } | { "type": "imageGeneration", id: string, status: string, revisedPrompt: string | null, result: string, savedPath?: AbsolutePathBuf, } | { "type": "enteredReviewMode", id: string, review: string, } | { "type": "exitedReviewMode", id: string, review: string, } | { "type": "contextCompaction", id: string, };
@@ -367,6 +367,12 @@ impl ThreadHistoryBuilder {
ThreadItem::from(payload.item.clone()),
);
}
codex_protocol::items::TurnItem::Sleep(_) => {
self.upsert_item_in_turn_id(
&payload.turn_id,
ThreadItem::from(payload.item.clone()),
);
}
codex_protocol::items::TurnItem::UserMessage(_)
| codex_protocol::items::TurnItem::HookPrompt(_)
| codex_protocol::items::TurnItem::AgentMessage(_)
@@ -391,6 +397,12 @@ impl ThreadHistoryBuilder {
ThreadItem::from(payload.item.clone()),
);
}
codex_protocol::items::TurnItem::Sleep(_) => {
self.upsert_item_in_turn_id(
&payload.turn_id,
ThreadItem::from(payload.item.clone()),
);
}
codex_protocol::items::TurnItem::UserMessage(_)
| codex_protocol::items::TurnItem::HookPrompt(_)
| codex_protocol::items::TurnItem::AgentMessage(_)
@@ -1234,6 +1246,7 @@ mod tests {
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynamicToolCallOutputContentItem;
use codex_protocol::items::HookPromptFragment as CoreHookPromptFragment;
use codex_protocol::items::SleepItem as CoreSleepItem;
use codex_protocol::items::TurnItem as CoreTurnItem;
use codex_protocol::items::UserMessageItem as CoreUserMessageItem;
use codex_protocol::items::build_hook_prompt_message;
@@ -1251,6 +1264,7 @@ mod tests {
use codex_protocol::protocol::DynamicToolCallResponseEvent;
use codex_protocol::protocol::ExecCommandEndEvent;
use codex_protocol::protocol::ExecCommandSource;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use codex_protocol::protocol::McpInvocation;
use codex_protocol::protocol::McpToolCallEndEvent;
@@ -1420,7 +1434,7 @@ mod tests {
}
#[test]
fn ignores_non_plan_item_lifecycle_events() {
fn ignores_user_message_item_lifecycle_events() {
let turn_id = "turn-1";
let thread_id = ThreadId::new();
let events = vec![
@@ -1478,6 +1492,53 @@ mod tests {
);
}
#[test]
fn rebuilds_sleep_item_from_persisted_completion() {
let turn_id = "turn-1";
let thread_id = ThreadId::new();
let sleep_item = CoreTurnItem::Sleep(CoreSleepItem {
id: "sleep-1".to_string(),
duration_ms: 1_000,
});
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
trace_id: None,
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
EventMsg::ItemCompleted(ItemCompletedEvent {
thread_id,
turn_id: turn_id.to_string(),
item: sleep_item,
completed_at_ms: 1_000,
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_id.to_string(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
time_to_first_token_ms: None,
}),
];
let items = events
.into_iter()
.map(RolloutItem::EventMsg)
.collect::<Vec<_>>();
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
assert_eq!(
turns[0].items,
vec![ThreadItem::Sleep {
id: "sleep-1".to_string(),
duration_ms: 1_000,
}]
);
}
#[test]
fn preserves_user_message_client_id_from_legacy_event() {
let turn_id = "turn-1";
@@ -356,6 +356,13 @@ pub enum ThreadItem {
ImageView { id: String, path: AbsolutePathBuf },
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
Sleep {
id: String,
#[ts(type = "number")]
duration_ms: u64,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
ImageGeneration {
id: String,
status: String,
@@ -400,6 +407,7 @@ impl ThreadItem {
| ThreadItem::SubAgentActivity { id, .. }
| ThreadItem::WebSearch { id, .. }
| ThreadItem::ImageView { id, .. }
| ThreadItem::Sleep { id, .. }
| ThreadItem::ImageGeneration { id, .. }
| ThreadItem::EnteredReviewMode { id, .. }
| ThreadItem::ExitedReviewMode { id, .. }
@@ -837,6 +845,10 @@ impl From<CoreTurnItem> for ThreadItem {
id: image.id,
path: image.path,
},
CoreTurnItem::Sleep(sleep) => ThreadItem::Sleep {
id: sleep.id,
duration_ms: sleep.duration_ms,
},
CoreTurnItem::ImageGeneration(image) => ThreadItem::ImageGeneration {
id: image.id,
status: image.status,
+1
View File
@@ -1354,6 +1354,7 @@ Today both notifications carry an empty `items` array even when item events were
- `collabToolCall``{id, tool, status, senderThreadId, receiverThreadId?, newThreadId?, prompt?, agentStatus?}` describing collab tool calls (`spawn_agent`, `send_input`, `resume_agent`, `wait`, `close_agent`); `status` is `inProgress`, `completed`, or `failed`.
- `webSearch``{id, query, action?}` for a web search request issued by the agent; `action` mirrors the Responses API web_search action payload (`search`, `open_page`, `find_in_page`) and may be omitted until completion.
- `imageView``{id, path}` emitted when the agent invokes the image viewer tool.
- `sleep``{id, durationMs}` emitted while the agent waits for a duration or new input.
- `enteredReviewMode``{id, review}` sent when the reviewer starts; `review` is a short user-facing label such as `"current changes"` or the requested target description.
- `exitedReviewMode``{id, review}` emitted when the reviewer finishes; `review` is the full plain-text review (usually, overall notes plus bullet point findings).
- `contextCompaction``{id}` emitted when codex compacts the conversation history. This can happen automatically.
@@ -52,6 +52,7 @@ mod request_user_input;
mod review;
mod safety_check_downgrade;
mod skills_list;
mod sleep;
mod thread_archive;
mod thread_delete;
mod thread_fork;
+174
View File
@@ -0,0 +1,174 @@
use anyhow::Result;
use app_test_support::TestAppServer;
use app_test_support::to_response;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sleep_emits_started_and_completed_items() -> Result<()> {
const CALL_ID: &str = "sleep-1";
const DURATION_MS: u64 = 1;
let server = responses::start_mock_server().await;
responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_function_call(
CALL_ID,
"sleep",
&serde_json::json!({ "duration_ms": DURATION_MS }).to_string(),
),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_start_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response(thread_start_response)?;
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Sleep briefly".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_start_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
)
.await??;
let TurnStartResponse { turn, .. } = to_response(turn_start_response)?;
let (started, completed) = timeout(DEFAULT_READ_TIMEOUT, async {
let mut started = None;
let mut completed = None;
while started.is_none() || completed.is_none() {
let JSONRPCMessage::Notification(notification) = mcp.read_next_message().await? else {
continue;
};
match notification.method.as_str() {
"item/started" => {
let payload: ItemStartedNotification =
serde_json::from_value(notification.params.expect("item/started params"))?;
if matches!(&payload.item, ThreadItem::Sleep { .. }) {
started = Some(payload);
}
}
"item/completed" => {
let payload: ItemCompletedNotification = serde_json::from_value(
notification.params.expect("item/completed params"),
)?;
if matches!(&payload.item, ThreadItem::Sleep { .. }) {
completed = Some(payload);
}
}
_ => {}
}
}
Ok::<_, anyhow::Error>((
started.expect("sleep started"),
completed.expect("sleep completed"),
))
})
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let expected_item = ThreadItem::Sleep {
id: CALL_ID.to_string(),
duration_ms: DURATION_MS,
};
assert!(completed.completed_at_ms >= started.started_at_ms);
assert_eq!(
started,
ItemStartedNotification {
item: expected_item.clone(),
thread_id: thread.id.clone(),
turn_id: turn.id.clone(),
started_at_ms: started.started_at_ms,
}
);
assert_eq!(
completed,
ItemCompletedNotification {
item: expected_item,
thread_id: thread.id,
turn_id: turn.id,
completed_at_ms: completed.completed_at_ms,
}
);
Ok(())
}
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
[features]
sleep_tool = true
"#
),
)
}
+6
View File
@@ -617,6 +617,9 @@
"skill_mcp_dependency_install": {
"type": "boolean"
},
"sleep_tool": {
"type": "boolean"
},
"sqlite": {
"type": "boolean"
},
@@ -4772,6 +4775,9 @@
"skill_mcp_dependency_install": {
"type": "boolean"
},
"sleep_tool": {
"type": "boolean"
},
"sqlite": {
"type": "boolean"
},
+2
View File
@@ -26,6 +26,7 @@ mod request_user_input;
pub(crate) mod request_user_input_spec;
mod shell;
pub(crate) mod shell_spec;
mod sleep;
mod test_sync;
pub(crate) mod test_sync_spec;
mod tool_search;
@@ -68,6 +69,7 @@ pub use request_plugin_install::RequestPluginInstallHandler;
pub use request_user_input::RequestUserInputHandler;
pub use shell::ShellCommandHandler;
pub(crate) use shell::ShellCommandHandlerOptions;
pub use sleep::SleepHandler;
pub use test_sync::TestSyncHandler;
pub(crate) use tool_search::ToolSearchHandlerCache;
pub use unified_exec::ExecCommandHandler;
+131
View File
@@ -0,0 +1,131 @@
use crate::function_tool::FunctionCallError;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
use crate::tools::context::boxed_tool_output;
use crate::tools::handlers::parse_arguments;
use crate::tools::registry::CoreToolRuntime;
use crate::tools::registry::ToolExecutor;
use codex_protocol::items::SleepItem;
use codex_protocol::items::TurnItem;
use codex_tools::JsonSchema;
use codex_tools::ResponsesApiTool;
use codex_tools::ToolName;
use codex_tools::ToolSpec;
use serde::Deserialize;
use std::collections::BTreeMap;
use std::time::Duration;
use std::time::Instant;
const SLEEP_TOOL_NAME: &str = "sleep";
const MAX_SLEEP_DURATION_MS: u64 = 3_600_000;
pub struct SleepHandler;
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct SleepArgs {
duration_ms: u64,
}
fn create_sleep_tool() -> ToolSpec {
let properties = BTreeMap::from([(
"duration_ms".to_string(),
JsonSchema::number(Some(format!(
"How long to sleep in milliseconds. Must be between 1 and {MAX_SLEEP_DURATION_MS}."
))),
)]);
ToolSpec::Function(ResponsesApiTool {
name: SLEEP_TOOL_NAME.to_string(),
description: "Pause execution for a specified duration. The sleep ends early when new input arrives for the active turn. Returns the elapsed wall-clock time."
.to_string(),
strict: false,
defer_loading: None,
parameters: JsonSchema::object(
properties,
Some(vec!["duration_ms".to_string()]),
/*additional_properties*/ Some(false.into()),
),
output_schema: None,
})
}
impl ToolExecutor<ToolInvocation> for SleepHandler {
fn tool_name(&self) -> ToolName {
ToolName::plain(SLEEP_TOOL_NAME)
}
fn spec(&self) -> ToolSpec {
create_sleep_tool()
}
fn handle(&self, invocation: ToolInvocation) -> codex_tools::ToolExecutorFuture<'_> {
Box::pin(async move {
let ToolInvocation {
session,
turn,
call_id,
payload,
..
} = invocation;
let ToolPayload::Function { arguments } = payload else {
return Err(FunctionCallError::RespondToModel(format!(
"{SLEEP_TOOL_NAME} handler received unsupported payload"
)));
};
let args: SleepArgs = parse_arguments(&arguments)?;
if !(1..=MAX_SLEEP_DURATION_MS).contains(&args.duration_ms) {
return Err(FunctionCallError::RespondToModel(format!(
"duration_ms must be between 1 and {MAX_SLEEP_DURATION_MS}"
)));
}
let started = Instant::now();
let item = TurnItem::Sleep(SleepItem {
id: call_id,
duration_ms: args.duration_ms,
});
session.emit_turn_item_started(turn.as_ref(), &item).await;
let turn_state = session
.input_queue
.turn_state_for_sub_id(&session.active_turn, &turn.sub_id)
.await;
let (mut activity_rx, pending_activity) = session
.input_queue
.subscribe_activity(turn_state.as_deref())
.await;
let interrupted = if pending_activity.is_some() {
true
} else {
let sleep = tokio::time::sleep(Duration::from_millis(args.duration_ms));
tokio::pin!(sleep);
tokio::select! {
() = &mut sleep => false,
result = activity_rx.changed() => {
if result.is_ok() {
true
} else {
sleep.await;
false
}
}
}
};
session.emit_turn_item_completed(turn.as_ref(), item).await;
let message = if interrupted {
"Sleep interrupted by new input."
} else {
"Sleep completed."
};
let wall_time_seconds = started.elapsed().as_secs_f64();
Ok(boxed_tool_output(FunctionToolOutput::from_text(
format!("Wall time: {wall_time_seconds:.4} seconds\n{message}"),
/*success*/ Some(true),
)))
})
}
}
impl CoreToolRuntime for SleepHandler {}
+5
View File
@@ -22,6 +22,7 @@ use crate::tools::handlers::RequestPluginInstallHandler;
use crate::tools::handlers::RequestUserInputHandler;
use crate::tools::handlers::ShellCommandHandler;
use crate::tools::handlers::ShellCommandHandlerOptions;
use crate::tools::handlers::SleepHandler;
use crate::tools::handlers::TestSyncHandler;
use crate::tools::handlers::ToolSearchHandlerCache;
use crate::tools::handlers::ViewImageHandler;
@@ -666,6 +667,10 @@ fn add_core_utility_tools(context: &CoreToolPlanContext<'_>, planned_tools: &mut
planned_tools.add(GetContextRemainingHandler);
}
if features.enabled(Feature::SleepTool) {
planned_tools.add(SleepHandler);
}
if tool_suggest_enabled(turn_context)
&& let Some(discoverable_tools) =
context.discoverable_tools.filter(|tools| !tools.is_empty())
@@ -672,6 +672,21 @@ async fn host_context_gates_agent_job_tools() {
worker_agent_job.assert_visible_contains(&["spawn_agents_on_csv", "report_agent_job_result"]);
}
#[tokio::test]
async fn sleep_tool_follows_feature_gate() {
let disabled = probe(|turn| {
set_feature(turn, Feature::SleepTool, /*enabled*/ false);
})
.await;
disabled.assert_visible_lacks(&["sleep"]);
let enabled = probe(|turn| {
set_feature(turn, Feature::SleepTool, /*enabled*/ true);
})
.await;
enabled.assert_visible_contains(&["sleep"]);
}
#[tokio::test]
async fn mcp_and_tool_search_follow_direct_and_deferred_tool_exposure() {
let direct_mcp = probe_with(
+188 -11
View File
@@ -4,12 +4,15 @@ use std::sync::Arc;
use codex_core::CodexThread;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::items::SleepItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::user_input::UserInput;
use core_test_support::context_snapshot;
use core_test_support::context_snapshot::ContextSnapshotOptions;
@@ -65,6 +68,34 @@ fn message_input_texts(body: &Value, role: &str) -> Vec<String> {
.collect()
}
fn function_call_output_text<'a>(body: &'a Value, call_id: &str) -> Option<&'a str> {
body.get("input")
.and_then(Value::as_array)?
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})?
.get("output")?
.as_str()
}
fn assert_interrupted_sleep_output(output: Option<&str>) {
let Some(output) = output else {
panic!("sleep output missing");
};
let Some(wall_time) = output
.strip_prefix("Wall time: ")
.and_then(|output| output.strip_suffix(" seconds\nSleep interrupted by new input."))
else {
panic!("sleep output should include wall time");
};
assert!(
wall_time.parse::<f64>().is_ok(),
"sleep wall time should be a number"
);
}
fn chunk(event: Value) -> StreamingSseChunk {
StreamingSseChunk {
gate: None,
@@ -207,6 +238,54 @@ async fn wait_for_turn_complete(codex: &CodexThread) {
wait_for_event(codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
}
async fn wait_for_sleep_item_started(codex: &CodexThread, call_id: &str, duration_ms: u64) {
let event = wait_for_event(codex, |event| {
matches!(
event,
EventMsg::ItemStarted(started)
if matches!(&started.item, TurnItem::Sleep(item) if item.id == call_id)
)
})
.await;
let EventMsg::ItemStarted(started) = event else {
unreachable!("wait predicate only accepts item/started events");
};
let TurnItem::Sleep(item) = started.item else {
unreachable!("wait predicate only accepts sleep items");
};
assert_eq!(
item,
SleepItem {
id: call_id.to_string(),
duration_ms,
}
);
}
async fn wait_for_sleep_item_completed(codex: &CodexThread, call_id: &str, duration_ms: u64) {
let event = wait_for_event(codex, |event| {
matches!(
event,
EventMsg::ItemCompleted(completed)
if matches!(&completed.item, TurnItem::Sleep(item) if item.id == call_id)
)
})
.await;
let EventMsg::ItemCompleted(completed) = event else {
unreachable!("wait predicate only accepts item/completed events");
};
let TurnItem::Sleep(item) = completed.item else {
unreachable!("wait predicate only accepts sleep items");
};
assert_eq!(
item,
SleepItem {
id: call_id.to_string(),
duration_ms,
}
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn steer_interrupts_wait_agent_and_is_sent_in_follow_up_request() {
const WAIT_CALL_ID: &str = "wait-call";
@@ -257,17 +336,7 @@ async fn steer_interrupts_wait_agent_and_is_sent_in_follow_up_request() {
relevant_user_input,
vec![INITIAL_PROMPT.to_string(), STEER_PROMPT.to_string()]
);
let wait_output = second["input"]
.as_array()
.expect("second request input")
.iter()
.find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(WAIT_CALL_ID)
})
.and_then(|item| item.get("output"))
.and_then(Value::as_str)
.expect("wait_agent output");
let wait_output = function_call_output_text(&second, WAIT_CALL_ID).expect("wait_agent output");
assert_eq!(
serde_json::from_str::<Value>(wait_output).expect("parse wait_agent output"),
json!({
@@ -279,6 +348,114 @@ async fn steer_interrupts_wait_agent_and_is_sent_in_follow_up_request() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn any_new_input_interrupts_sleep() {
const FIRST_SLEEP_CALL_ID: &str = "sleep-call-1";
const SECOND_SLEEP_CALL_ID: &str = "sleep-call-2";
const SLEEP_DURATION_MS: u64 = 3_600_000;
const INITIAL_PROMPT: &str = "sleep for a while";
const STEER_PROMPT: &str = "stop sleeping and continue";
let sleep_arguments = json!({ "duration_ms": SLEEP_DURATION_MS }).to_string();
let first_chunks = vec![
chunk(ev_response_created("resp-1")),
chunk(ev_function_call(
FIRST_SLEEP_CALL_ID,
"sleep",
&sleep_arguments,
)),
chunk(ev_completed("resp-1")),
];
let second_chunks = vec![
chunk(ev_response_created("resp-2")),
chunk(ev_function_call(
SECOND_SLEEP_CALL_ID,
"sleep",
&sleep_arguments,
)),
chunk(ev_completed("resp-2")),
];
let (server, _completions) = start_streaming_sse_server(vec![
first_chunks,
second_chunks,
response_completed_chunks("resp-3"),
])
.await;
let codex = test_codex()
.with_model("gpt-5.4")
.with_config(|config| {
config
.features
.enable(Feature::SleepTool)
.expect("test config should allow feature update");
})
.build_with_streaming_server(&server)
.await
.expect("build Codex test session")
.codex;
submit_user_input(&codex, INITIAL_PROMPT).await;
wait_for_sleep_item_started(&codex, FIRST_SLEEP_CALL_ID, SLEEP_DURATION_MS).await;
steer_user_input(&codex, STEER_PROMPT).await;
wait_for_sleep_item_completed(&codex, FIRST_SLEEP_CALL_ID, SLEEP_DURATION_MS).await;
wait_for_sleep_item_started(&codex, SECOND_SLEEP_CALL_ID, SLEEP_DURATION_MS).await;
submit_queue_only_agent_mail(&codex, "new mailbox input").await;
wait_for_sleep_item_completed(&codex, SECOND_SLEEP_CALL_ID, SLEEP_DURATION_MS).await;
wait_for_turn_complete(&codex).await;
let requests = server.requests().await;
assert_eq!(requests.len(), 3);
let second: Value = from_slice(&requests[1]).expect("parse second request");
let relevant_user_input = message_input_texts(&second, "user")
.into_iter()
.filter(|text| text == INITIAL_PROMPT || text == STEER_PROMPT)
.collect::<Vec<_>>();
assert_eq!(
relevant_user_input,
vec![INITIAL_PROMPT.to_string(), STEER_PROMPT.to_string()]
);
assert_interrupted_sleep_output(function_call_output_text(&second, FIRST_SLEEP_CALL_ID));
let third: Value = from_slice(&requests[2]).expect("parse third request");
assert_interrupted_sleep_output(function_call_output_text(&third, SECOND_SLEEP_CALL_ID));
codex.submit(Op::Shutdown).await.expect("shutdown session");
wait_for_event(&codex, |event| matches!(event, EventMsg::ShutdownComplete)).await;
let rollout_path = codex.rollout_path().expect("rollout path");
let rollout = tokio::fs::read_to_string(rollout_path)
.await
.expect("read rollout");
let persisted_sleep_items = rollout
.lines()
.filter_map(|line| serde_json::from_str::<RolloutLine>(line).ok())
.filter_map(|line| match line.item {
RolloutItem::EventMsg(EventMsg::ItemCompleted(event)) => match event.item {
TurnItem::Sleep(item) => Some(item),
_ => None,
},
_ => None,
})
.collect::<Vec<_>>();
assert_eq!(
persisted_sleep_items,
vec![
SleepItem {
id: FIRST_SLEEP_CALL_ID.to_string(),
duration_ms: SLEEP_DURATION_MS,
},
SleepItem {
id: SECOND_SLEEP_CALL_ID.to_string(),
duration_ms: SLEEP_DURATION_MS,
},
]
);
server.shutdown().await;
}
fn assert_two_responses_input_snapshot(snapshot_name: &str, requests: &[Vec<u8>]) {
assert_eq!(requests.len(), 2);
let options = ContextSnapshotOptions::default().strip_capability_instructions();
+8
View File
@@ -201,6 +201,8 @@ pub enum Feature {
Goals,
/// Add current context-window metadata to model-visible context.
TokenBudget,
/// Expose an input-interruptible sleep tool.
SleepTool,
/// Route MCP tool approval prompts through the MCP elicitation request path.
ToolCallMcpElicitation,
/// Prompt Codex Apps connector auth failures through MCP URL elicitations.
@@ -1157,6 +1159,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::SleepTool,
key: "sleep_tool",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::CollaborationModes,
key: "collaboration_modes",
+9
View File
@@ -47,6 +47,7 @@ pub enum TurnItem {
Reasoning(ReasoningItem),
WebSearch(WebSearchItem),
ImageView(ImageViewItem),
Sleep(SleepItem),
ImageGeneration(ImageGenerationItem),
FileChange(FileChangeItem),
McpToolCall(McpToolCallItem),
@@ -140,6 +141,12 @@ pub struct ImageViewItem {
pub path: AbsolutePathBuf,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema, PartialEq, Eq)]
pub struct SleepItem {
pub id: String,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema, PartialEq)]
pub struct ImageGenerationItem {
pub id: String,
@@ -576,6 +583,7 @@ impl TurnItem {
TurnItem::Reasoning(item) => item.id.clone(),
TurnItem::WebSearch(item) => item.id.clone(),
TurnItem::ImageView(item) => item.id.clone(),
TurnItem::Sleep(item) => item.id.clone(),
TurnItem::ImageGeneration(item) => item.id.clone(),
TurnItem::FileChange(item) => item.id.clone(),
TurnItem::McpToolCall(item) => item.id.clone(),
@@ -596,6 +604,7 @@ impl TurnItem {
path: item.path.clone(),
})]
}
TurnItem::Sleep(_) => Vec::new(),
TurnItem::ImageGeneration(item) => vec![item.as_legacy_event()],
TurnItem::FileChange(item) => item
.as_legacy_end_event(String::new())
+8 -4
View File
@@ -95,10 +95,14 @@ pub fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::ImageGenerationEnd(_)
| EventMsg::SubAgentActivity(_) => true,
EventMsg::ItemCompleted(event) => {
// Plan items are derived from streaming tags and are not part of the
// raw ResponseItem history, so we persist their completion to replay
// them on resume without bloating rollouts with every item lifecycle.
matches!(event.item, codex_protocol::items::TurnItem::Plan(_))
// These items have no equivalent raw ResponseItem or legacy event,
// so persist their completion for replay without retaining every
// item lifecycle event.
matches!(
event.item,
codex_protocol::items::TurnItem::Plan(_)
| codex_protocol::items::TurnItem::Sleep(_)
)
}
EventMsg::Error(_)
| EventMsg::GuardianAssessment(_)
+3 -1
View File
@@ -188,7 +188,9 @@ fn activity_summary(item: &ThreadItem) -> Option<String> {
ThreadItem::EnteredReviewMode { .. } => return Some("Entered review mode".to_string()),
ThreadItem::ExitedReviewMode { .. } => return Some("Exited review mode".to_string()),
ThreadItem::ContextCompaction { .. } => return Some("Compacted context".to_string()),
ThreadItem::UserMessage { .. } | ThreadItem::HookPrompt { .. } => return None,
ThreadItem::UserMessage { .. }
| ThreadItem::HookPrompt { .. }
| ThreadItem::Sleep { .. } => return None,
};
bounded_summary(summary)
}
+1
View File
@@ -193,6 +193,7 @@ impl ChatWidget {
}),
item @ ThreadItem::SubAgentActivity { .. } => self.on_sub_agent_activity(item),
ThreadItem::DynamicToolCall { .. } => {}
ThreadItem::Sleep { .. } => {}
}
if matches!(replay_kind, Some(ReplayKind::ThreadSnapshot)) && turn_id.is_empty() {
+2 -1
View File
@@ -226,7 +226,8 @@ fn fallback_transcript_cell(item: &ThreadItem) -> Option<PlainHistoryCell> {
ThreadItem::UserMessage { .. }
| ThreadItem::AgentMessage { .. }
| ThreadItem::Plan { .. }
| ThreadItem::Reasoning { .. } => return None,
| ThreadItem::Reasoning { .. }
| ThreadItem::Sleep { .. } => return None,
};
(!lines.is_empty()).then(|| PlainHistoryCell::new(lines))
}