From f4602b7516a28c09c89349300799ef347bbf7668 Mon Sep 17 00:00:00 2001 From: rka-oai Date: Thu, 18 Jun 2026 13:12:11 -0700 Subject: [PATCH] Add app-server current-time impl (varlatency 3/n) (#28835) ## What Server should request: ``` { "id": 42, "method": "currentTime/read", "params": { "threadId": "11111111-1111-1111-1111-aaaaafdc2c11" } } ``` Client should respond with something like: ```rust { "id": 42, "result": { "currentTimeAt": 1781717655 } } ``` ## Why Sessions configured with `clock_source = "external"` need a thread-specific external time source before inference. The system clock remains the default production provider. ## Validation - `cargo test -p codex-app-server-protocol` - `cargo test -p codex-app-server --test all current_time_read_round_trip_adds_reminder_to_model_input` - `cargo test -p codex-app-server first_attestation_capable_connection_for_thread_only_uses_thread_subscribers` - `cargo test -p codex-analytics` - `just fix -p codex-app-server-protocol` - `just fix -p codex-app-server` Stacked on #28824. --- .../schema/typescript/ServerRequest.ts | 2 +- codex-rs/app-server-protocol/src/export.rs | 50 ++++-- .../src/protocol/common.rs | 56 ++++++- .../src/protocol/v2/current_time.rs | 20 +++ .../src/protocol/v2/mod.rs | 2 + codex-rs/app-server/README.md | 4 + codex-rs/app-server/src/current_time.rs | 146 ++++++++++++++++++ codex-rs/app-server/src/lib.rs | 1 + codex-rs/app-server/src/message_processor.rs | 7 +- .../thread_processor_tests.rs | 25 +++ codex-rs/app-server/src/thread_state.rs | 17 ++ .../app-server/tests/suite/v2/current_time.rs | 130 ++++++++++++++++ codex-rs/app-server/tests/suite/v2/mod.rs | 1 + codex-rs/exec/src/lib.rs | 9 ++ .../tui/src/app/app_server_event_targets.rs | 3 + codex-rs/tui/src/app/app_server_requests.rs | 7 + codex-rs/tui/src/app/side.rs | 1 + .../tui/src/chatwidget/protocol_requests.rs | 1 + 18 files changed, 463 insertions(+), 19 deletions(-) create mode 100644 codex-rs/app-server-protocol/src/protocol/v2/current_time.rs create mode 100644 codex-rs/app-server/src/current_time.rs create mode 100644 codex-rs/app-server/tests/suite/v2/current_time.rs diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerRequest.ts b/codex-rs/app-server-protocol/schema/typescript/ServerRequest.ts index 80e9ffc11..89a544005 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerRequest.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerRequest.ts @@ -16,4 +16,4 @@ import type { ToolRequestUserInputParams } from "./v2/ToolRequestUserInputParams /** * Request initiated from the server and sent to the client. */ -export type ServerRequest = { "method": "item/commandExecution/requestApproval", id: RequestId, params: CommandExecutionRequestApprovalParams, } | { "method": "item/fileChange/requestApproval", id: RequestId, params: FileChangeRequestApprovalParams, } | { "method": "item/tool/requestUserInput", id: RequestId, params: ToolRequestUserInputParams, } | { "method": "mcpServer/elicitation/request", id: RequestId, params: McpServerElicitationRequestParams, } | { "method": "item/permissions/requestApproval", id: RequestId, params: PermissionsRequestApprovalParams, } | { "method": "item/tool/call", id: RequestId, params: DynamicToolCallParams, } | { "method": "account/chatgptAuthTokens/refresh", id: RequestId, params: ChatgptAuthTokensRefreshParams, } | { "method": "attestation/generate", id: RequestId, params: AttestationGenerateParams, } | { "method": "applyPatchApproval", id: RequestId, params: ApplyPatchApprovalParams, } | { "method": "execCommandApproval", id: RequestId, params: ExecCommandApprovalParams, }; +export type ServerRequest ={ "method": "item/commandExecution/requestApproval", id: RequestId, params: CommandExecutionRequestApprovalParams, } | { "method": "item/fileChange/requestApproval", id: RequestId, params: FileChangeRequestApprovalParams, } | { "method": "item/tool/requestUserInput", id: RequestId, params: ToolRequestUserInputParams, } | { "method": "mcpServer/elicitation/request", id: RequestId, params: McpServerElicitationRequestParams, } | { "method": "item/permissions/requestApproval", id: RequestId, params: PermissionsRequestApprovalParams, } | { "method": "item/tool/call", id: RequestId, params: DynamicToolCallParams, } | { "method": "account/chatgptAuthTokens/refresh", id: RequestId, params: ChatgptAuthTokensRefreshParams, } | { "method": "attestation/generate", id: RequestId, params: AttestationGenerateParams, } | { "method": "applyPatchApproval", id: RequestId, params: ApplyPatchApprovalParams, } | { "method": "execCommandApproval", id: RequestId, params: ExecCommandApprovalParams, }; diff --git a/codex-rs/app-server-protocol/src/export.rs b/codex-rs/app-server-protocol/src/export.rs index 5e6c2ad01..91eca690f 100644 --- a/codex-rs/app-server-protocol/src/export.rs +++ b/codex-rs/app-server-protocol/src/export.rs @@ -14,6 +14,9 @@ use crate::export_server_responses; use crate::protocol::common::EXPERIMENTAL_CLIENT_METHOD_PARAM_TYPES; use crate::protocol::common::EXPERIMENTAL_CLIENT_METHOD_RESPONSE_TYPES; use crate::protocol::common::EXPERIMENTAL_CLIENT_METHODS; +use crate::protocol::common::EXPERIMENTAL_SERVER_METHOD_PARAM_TYPES; +use crate::protocol::common::EXPERIMENTAL_SERVER_METHOD_RESPONSE_TYPES; +use crate::protocol::common::EXPERIMENTAL_SERVER_METHODS; use anyhow::Context; use anyhow::Result; use anyhow::anyhow; @@ -249,10 +252,10 @@ fn filter_experimental_ts(out_dir: &Path) -> Result<()> { let registered_fields = experimental_fields(); let experimental_method_types = experimental_method_types(); // Most generated TS files are filtered by schema processing, but - // `ClientRequest.ts` and any type with `#[experimental(...)]` fields need - // direct post-processing because they encode method/field information in - // file-local unions/interfaces. - filter_client_request_ts(out_dir, EXPERIMENTAL_CLIENT_METHODS)?; + // Request unions and types with `#[experimental(...)]` fields need direct + // post-processing because they encode method/field information locally. + filter_request_ts(out_dir, "ClientRequest.ts", EXPERIMENTAL_CLIENT_METHODS)?; + filter_request_ts(out_dir, "ServerRequest.ts", EXPERIMENTAL_SERVER_METHODS)?; filter_experimental_type_fields_ts(out_dir, ®istered_fields)?; remove_generated_type_files(out_dir, &experimental_method_types, "ts")?; Ok(()) @@ -261,10 +264,13 @@ fn filter_experimental_ts(out_dir: &Path) -> Result<()> { pub(crate) fn filter_experimental_ts_tree(tree: &mut BTreeMap) -> Result<()> { let registered_fields = experimental_fields(); let experimental_method_types = experimental_method_types(); - if let Some(content) = tree.get_mut(Path::new("ClientRequest.ts")) { - let filtered = - filter_client_request_ts_contents(std::mem::take(content), EXPERIMENTAL_CLIENT_METHODS); - *content = filtered; + for (file_name, experimental_methods) in [ + ("ClientRequest.ts", EXPERIMENTAL_CLIENT_METHODS), + ("ServerRequest.ts", EXPERIMENTAL_SERVER_METHODS), + ] { + if let Some(content) = tree.get_mut(Path::new(file_name)) { + *content = filter_request_ts_contents(std::mem::take(content), experimental_methods); + } } let mut fields_by_type_name: HashMap> = HashMap::new(); @@ -293,21 +299,21 @@ pub(crate) fn filter_experimental_ts_tree(tree: &mut BTreeMap) Ok(()) } -/// Removes union arms from `ClientRequest.ts` for methods marked experimental. -fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Result<()> { - let path = out_dir.join("ClientRequest.ts"); +/// Removes union arms from a generated request type for methods marked experimental. +fn filter_request_ts(out_dir: &Path, file_name: &str, experimental_methods: &[&str]) -> Result<()> { + let path = out_dir.join(file_name); if !path.exists() { return Ok(()); } let mut content = fs::read_to_string(&path).with_context(|| format!("Failed to read {}", path.display()))?; - content = filter_client_request_ts_contents(content, experimental_methods); + content = filter_request_ts_contents(content, experimental_methods); fs::write(&path, content).with_context(|| format!("Failed to write {}", path.display()))?; Ok(()) } -fn filter_client_request_ts_contents(mut content: String, experimental_methods: &[&str]) -> String { +fn filter_request_ts_contents(mut content: String, experimental_methods: &[&str]) -> String { let Some((prefix, body, suffix)) = split_type_alias(&content) else { return content; }; @@ -404,6 +410,7 @@ fn filter_experimental_schema(bundle: &mut Value) -> Result<()> { filter_experimental_fields_in_root(bundle, ®istered_fields); filter_experimental_fields_in_definitions(bundle, ®istered_fields); prune_experimental_methods(bundle, EXPERIMENTAL_CLIENT_METHODS); + prune_experimental_methods(bundle, EXPERIMENTAL_SERVER_METHODS); remove_experimental_method_type_definitions(bundle); Ok(()) } @@ -560,6 +567,8 @@ fn experimental_method_types() -> HashSet { collect_experimental_type_names(EXPERIMENTAL_CLIENT_METHOD_PARAM_TYPES, &mut type_names); collect_experimental_type_names(EXPERIMENTAL_CLIENT_METHOD_RESPONSE_TYPES, &mut type_names); collect_experimental_type_names(EXPERIMENTAL_CLIENT_METHOD_DEPENDENCY_TYPES, &mut type_names); + collect_experimental_type_names(EXPERIMENTAL_SERVER_METHOD_PARAM_TYPES, &mut type_names); + collect_experimental_type_names(EXPERIMENTAL_SERVER_METHOD_RESPONSE_TYPES, &mut type_names); type_names } @@ -2118,6 +2127,13 @@ mod tests { client_request_ts.contains("MockExperimentalMethodParams"), false ); + let server_request_ts = std::str::from_utf8( + fixture_tree + .get(Path::new("ServerRequest.ts")) + .ok_or_else(|| anyhow::anyhow!("missing ServerRequest.ts fixture"))?, + )?; + assert_eq!(server_request_ts.contains("currentTime/read"), false); + assert_eq!(server_request_ts.contains("CurrentTimeReadParams"), false); let typescript_index = std::str::from_utf8( fixture_tree .get(Path::new("index.ts")) @@ -2138,6 +2154,14 @@ mod tests { fixture_tree.contains_key(Path::new("v2/MockExperimentalMethodResponse.ts")), false ); + assert_eq!( + fixture_tree.contains_key(Path::new("v2/CurrentTimeReadParams.ts")), + false + ); + assert_eq!( + fixture_tree.contains_key(Path::new("v2/CurrentTimeReadResponse.ts")), + false + ); assert_eq!( fixture_tree.contains_key(Path::new("v2/RemoteControlClient.ts")), false diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index c1fac138a..e5a882b3b 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -1184,7 +1184,8 @@ client_request_definitions! { macro_rules! server_request_definitions { ( $( - $(#[$variant_meta:meta])* + $(#[experimental($reason:expr)])? + $(#[doc = $variant_doc:literal])* $variant:ident $(=> $wire:literal)? { params: $params:ty, response: $response:ty, @@ -1197,7 +1198,7 @@ macro_rules! server_request_definitions { #[serde(tag = "method", rename_all = "camelCase")] pub enum ServerRequest { $( - $(#[$variant_meta])* + $(#[doc = $variant_doc])* $(#[serde(rename = $wire)] #[ts(rename = $wire)])? $variant { #[serde(rename = "id")] @@ -1237,7 +1238,7 @@ macro_rules! server_request_definitions { #[serde(tag = "method", rename_all = "camelCase")] pub enum ServerResponse { $( - $(#[$variant_meta])* + $(#[doc = $variant_doc])* $(#[serde(rename = $wire)])? $variant { #[serde(rename = "id")] @@ -1281,6 +1282,22 @@ macro_rules! server_request_definitions { } } + pub(crate) const EXPERIMENTAL_SERVER_METHODS: &[&str] = &[ + $( + experimental_method_entry!($(#[experimental($reason)])? $(=> $wire)?), + )* + ]; + pub(crate) const EXPERIMENTAL_SERVER_METHOD_PARAM_TYPES: &[&str] = &[ + $( + experimental_type_entry!($(#[experimental($reason)])? $params), + )* + ]; + pub(crate) const EXPERIMENTAL_SERVER_METHOD_RESPONSE_TYPES: &[&str] = &[ + $( + experimental_type_entry!($(#[experimental($reason)])? $response), + )* + ]; + pub fn export_server_responses( out_dir: &::std::path::Path, ) -> ::std::result::Result<(), ::ts_rs::ExportError> { @@ -1470,6 +1487,13 @@ server_request_definitions! { response: v2::AttestationGenerateResponse, }, + #[experimental("currentTime/read")] + /// Read the current time from an external clock owned by the client. + CurrentTimeRead => "currentTime/read" { + params: v2::CurrentTimeReadParams, + response: v2::CurrentTimeReadResponse, + }, + /// DEPRECATED APIs below /// Request to approve a patch. /// This request is used for Turns started via the legacy APIs (i.e. SendUserTurn, SendUserMessage). @@ -2372,6 +2396,32 @@ mod tests { Ok(()) } + #[test] + fn serialize_current_time_read_request() -> Result<()> { + let params = v2::CurrentTimeReadParams { + thread_id: "thread-123".to_string(), + }; + let request = ServerRequest::CurrentTimeRead { + request_id: RequestId::Integer(10), + params: params.clone(), + }; + assert_eq!( + json!({ + "method": "currentTime/read", + "id": 10, + "params": { + "threadId": "thread-123" + } + }), + serde_json::to_value(&request)?, + ); + + let payload = ServerRequestPayload::CurrentTimeRead(params); + assert_eq!(request.id(), &RequestId::Integer(10)); + assert_eq!(payload.request_with_id(RequestId::Integer(10)), request); + Ok(()) + } + #[test] fn serialize_server_response() -> Result<()> { let response = ServerResponse::CommandExecutionRequestApproval { diff --git a/codex-rs/app-server-protocol/src/protocol/v2/current_time.rs b/codex-rs/app-server-protocol/src/protocol/v2/current_time.rs new file mode 100644 index 000000000..9fd23f798 --- /dev/null +++ b/codex-rs/app-server-protocol/src/protocol/v2/current_time.rs @@ -0,0 +1,20 @@ +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; +use ts_rs::TS; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct CurrentTimeReadParams { + pub thread_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct CurrentTimeReadResponse { + /// Current time as whole Unix seconds. + #[ts(type = "number")] + pub current_time_at: i64, +} diff --git a/codex-rs/app-server-protocol/src/protocol/v2/mod.rs b/codex-rs/app-server-protocol/src/protocol/v2/mod.rs index b5fa9fdc6..1097b0381 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/mod.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/mod.rs @@ -6,6 +6,7 @@ mod attestation; mod collaboration_mode; mod command_exec; mod config; +mod current_time; mod environment; mod experimental_feature; mod feedback; @@ -32,6 +33,7 @@ pub use attestation::*; pub use collaboration_mode::*; pub use command_exec::*; pub use config::*; +pub use current_time::*; pub use environment::*; pub use experimental_feature::*; pub use feedback::*; diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 183c7c600..b9c7899fc 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -1469,6 +1469,10 @@ When the client responds to `item/tool/requestUserInput`, the server emits `serv Desktop hosts that provide upstream attestation should set `capabilities.requestAttestation` during `initialize` and handle the server-initiated `attestation/generate` request. App-server issues it just in time before ChatGPT Codex requests that forward `x-oai-attestation`; the client responds with `{ "token": "v1." }`, where `token` is an opaque client-owned value. When app-server receives a client response, it forwards a consistent outer envelope such as `{ "v": 1, "s": 0, "t": "v1." }`, where `t` contains the client token unchanged. If app-server attempts attestation but fails within its own boundary, it sends the same envelope shape with an app-server status code and without `t` (`1 = timeout`, `2 = request failed`, `3 = request canceled`, `4 = malformed response`). If no initialized client opted into attestation, app-server omits `x-oai-attestation` for that upstream request. +### Current time + +When `[features.current_time_reminder]` is enabled with `clock_source = "external"`, app-server sends the client subscribed to the thread an experimental `currentTime/read` request with `{ "threadId": "thr_123" }` when a time reminder is due. The client responds with `{ "currentTimeAt": 1781717655 }`, where `currentTimeAt` is an integer Unix timestamp in seconds. A failed, canceled, timed-out, or malformed response stops the turn before the model request is sent. + ### MCP server elicitations MCP servers can interrupt a turn and ask the client for structured input via `mcpServer/elicitation/request`. diff --git a/codex-rs/app-server/src/current_time.rs b/codex-rs/app-server/src/current_time.rs new file mode 100644 index 000000000..9b76d337a --- /dev/null +++ b/codex-rs/app-server/src/current_time.rs @@ -0,0 +1,146 @@ +use std::sync::Arc; +use std::sync::Weak; + +use anyhow::Context; +use anyhow::Result; +use anyhow::anyhow; +use anyhow::bail; +use chrono::DateTime; +use chrono::Utc; +use codex_app_server_protocol::CurrentTimeReadParams; +use codex_app_server_protocol::CurrentTimeReadResponse; +use codex_app_server_protocol::ServerRequestPayload; +use codex_core::TimeFuture; +use codex_core::TimeProvider; +use codex_protocol::ThreadId; +use tokio::time::Duration; +use tokio::time::Instant; +use tokio::time::timeout_at; + +use crate::outgoing_message::ConnectionId; +use crate::outgoing_message::OutgoingMessageSender; +use crate::thread_state::ThreadStateManager; + +const CURRENT_TIME_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); + +pub(crate) fn app_server_time_provider( + outgoing: Arc, + thread_state_manager: ThreadStateManager, +) -> Arc { + Arc::new(AppServerTimeProvider { + outgoing: Arc::downgrade(&outgoing), + thread_state_manager, + }) +} + +struct AppServerTimeProvider { + outgoing: Weak, + thread_state_manager: ThreadStateManager, +} + +impl TimeProvider for AppServerTimeProvider { + fn current_time(&self, thread_id: ThreadId) -> TimeFuture<'_> { + let outgoing = self.outgoing.clone(); + let thread_state_manager = self.thread_state_manager.clone(); + Box::pin(async move { + let outgoing = outgoing + .upgrade() + .context("app-server current-time provider is unavailable")?; + request_current_time(outgoing, thread_state_manager, thread_id).await + }) + } +} + +async fn request_current_time( + outgoing: Arc, + thread_state_manager: ThreadStateManager, + thread_id: ThreadId, +) -> Result> { + let deadline = Instant::now() + CURRENT_TIME_REQUEST_TIMEOUT; + timeout_at( + deadline, + thread_state_manager.wait_for_thread_subscriber(thread_id), + ) + .await + .map_err(|_| { + anyhow!( + "timed out waiting for a client to subscribe to the thread after {}s", + CURRENT_TIME_REQUEST_TIMEOUT.as_secs() + ) + })?; + let connection_ids = thread_state_manager + .subscribed_connection_ids(thread_id) + .await; + let connection_id = require_single_current_time_connection(&connection_ids)?; + let connection_ids = [connection_id]; + let (request_id, rx) = outgoing + .send_request_to_connections( + Some(&connection_ids), + ServerRequestPayload::CurrentTimeRead(CurrentTimeReadParams { + thread_id: thread_id.to_string(), + }), + /*thread_id*/ None, + ) + .await; + + let result = match timeout_at(deadline, rx).await { + Ok(Ok(Ok(result))) => result, + Ok(Ok(Err(err))) => { + bail!( + "current-time request failed: code={} message={}", + err.code, + err.message + ); + } + Ok(Err(err)) => bail!("current-time request was canceled: {err}"), + Err(_) => { + let _canceled = outgoing.cancel_request(&request_id).await; + bail!( + "current-time request timed out after {}s", + CURRENT_TIME_REQUEST_TIMEOUT.as_secs() + ); + } + }; + let response: CurrentTimeReadResponse = + serde_json::from_value(result).context("invalid current-time response")?; + + DateTime::from_timestamp(response.current_time_at, 0) + .ok_or_else(|| anyhow!("current-time response is outside the supported range")) +} + +fn require_single_current_time_connection(connection_ids: &[ConnectionId]) -> Result { + // External clocks are not interchangeable, so do not choose one silently. + match connection_ids { + [connection_id] => Ok(*connection_id), + _ => bail!( + "expected exactly one client subscribed to the thread, found {}", + connection_ids.len() + ), + } +} + +#[cfg(test)] +mod tests { + use super::require_single_current_time_connection; + use crate::outgoing_message::ConnectionId; + + #[test] + fn current_time_connection_must_be_unambiguous() { + assert_eq!( + require_single_current_time_connection(&[ConnectionId(7)]).unwrap(), + ConnectionId(7) + ); + assert_eq!( + require_single_current_time_connection(&[]) + .unwrap_err() + .to_string(), + "expected exactly one client subscribed to the thread, found 0" + ); + assert_eq!( + require_single_current_time_connection(&[ConnectionId(7), ConnectionId(8)]) + .unwrap_err() + .to_string(), + "expected exactly one client subscribed to the thread, found 2" + ); + } +} diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index cac2db54e..f4cd83d73 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -91,6 +91,7 @@ mod config_manager; mod config_manager_service; mod connection_cleanup; mod connection_rpc_gate; +mod current_time; mod dynamic_tools; mod error_code; mod extensions; diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 0f9f6d332..ad29e0501 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -7,6 +7,7 @@ use std::sync::atomic::AtomicBool; use crate::attestation::app_server_attestation_provider; use crate::config_manager::ConfigManager; use crate::connection_rpc_gate::ConnectionRpcGate; +use crate::current_time::app_server_time_provider; use crate::error_code::invalid_request; use crate::extensions::ThreadExtensionDependencies; use crate::extensions::app_server_extension_event_sink; @@ -280,7 +281,6 @@ impl ConnectionSessionState { .get() .is_some_and(|session| session.supports_openai_form_elicitation) } - pub(crate) fn initialize(&self, session: InitializedConnectionSessionState) -> Result<(), ()> { self.initialized.set(session).map_err(|_| ()) } @@ -379,7 +379,10 @@ impl MessageProcessor { outgoing.clone(), thread_state_manager.clone(), )), - /*external_time_provider*/ None, + Some(app_server_time_provider( + outgoing.clone(), + thread_state_manager.clone(), + )), ) }); let models_manager = thread_manager.get_models_manager(); diff --git a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs index 1f333f1eb..3e336cc2a 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs @@ -1381,6 +1381,31 @@ mod thread_processor_behavior_tests { Ok(()) } + #[tokio::test] + async fn wait_for_thread_subscriber_unblocks_after_connection_attaches() -> Result<()> { + let manager = ThreadStateManager::new(); + let thread_id = ThreadId::from_string("ba62fd70-2ec2-4b1b-9d94-355694332dd2")?; + let connection = ConnectionId(1); + manager + .connection_initialized(connection, ConnectionCapabilities::default()) + .await; + + let wait_for_subscriber = manager.wait_for_thread_subscriber(thread_id); + let attach_connection = async { + tokio::task::yield_now().await; + manager + .try_add_connection_to_thread(thread_id, connection) + .await + }; + let ((), attached) = tokio::time::timeout(Duration::from_secs(1), async { + tokio::join!(wait_for_subscriber, attach_connection) + }) + .await?; + + assert!(attached); + Ok(()) + } + #[tokio::test] async fn closed_connection_cannot_be_reintroduced_by_auto_subscribe() -> Result<()> { let manager = ThreadStateManager::new(); diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 6d2b48a4c..890d4da5e 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -329,6 +329,23 @@ impl ThreadStateManager { .min_by_key(|connection_id| connection_id.0) } + pub(crate) async fn wait_for_thread_subscriber(&self, thread_id: ThreadId) { + let mut has_connections = { + let mut state = self.state.lock().await; + state + .threads + .entry(thread_id) + .or_default() + .has_connections_watcher + .subscribe() + }; + while !*has_connections.borrow_and_update() { + if has_connections.changed().await.is_err() { + break; + } + } + } + pub(crate) async fn subscribed_connection_ids(&self, thread_id: ThreadId) -> Vec { let state = self.state.lock().await; state diff --git a/codex-rs/app-server/tests/suite/v2/current_time.rs b/codex-rs/app-server/tests/suite/v2/current_time.rs new file mode 100644 index 000000000..74e734ae3 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/current_time.rs @@ -0,0 +1,130 @@ +use std::path::Path; + +use anyhow::Result; +use app_test_support::TestAppServer; +use app_test_support::create_final_assistant_message_sse_response; +use app_test_support::to_response; +use codex_app_server_protocol::CurrentTimeReadResponse; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::UserInput; +use core_test_support::responses; +use core_test_support::skip_if_no_network; +use pretty_assertions::assert_eq; +use tempfile::TempDir; +use tokio::time::Duration; +use tokio::time::timeout; + +#[cfg(windows)] +const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(25); +#[cfg(not(windows))] +const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10); +const CURRENT_TIME_AT: i64 = 1_781_717_655; +const CURRENT_TIME_REMINDER: &str = "It is 2026-06-17 17:34:15 UTC."; + +#[tokio::test] +async fn current_time_read_round_trip_adds_reminder_to_model_input() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let response_mock = responses::mount_sse_once( + &server, + create_final_assistant_message_sse_response("Done")?, + ) + .await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut app_server = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, app_server.initialize()).await??; + + let thread_request_id = app_server + .send_thread_start_request(ThreadStartParams::default()) + .await?; + let thread_response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(thread_request_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response(thread_response)?; + + let turn_request_id = app_server + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "What time is it?".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let turn_response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(turn_request_id)), + ) + .await??; + let _: TurnStartResponse = to_response(turn_response)?; + + let server_request = timeout( + DEFAULT_READ_TIMEOUT, + app_server.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::CurrentTimeRead { request_id, params } = server_request else { + panic!("expected CurrentTimeRead request, got: {server_request:?}"); + }; + assert_eq!(params.thread_id, thread.id); + app_server + .send_response( + request_id, + serde_json::to_value(CurrentTimeReadResponse { + current_time_at: CURRENT_TIME_AT, + })?, + ) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + app_server.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + assert!( + response_mock + .single_request() + .message_input_texts("developer") + .iter() + .any(|text| text == CURRENT_TIME_REMINDER) + ); + Ok(()) +} + +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + std::fs::write( + codex_home.join("config.toml"), + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" +model_provider = "mock_provider" + +[features.current_time_reminder] +enabled = true +reminder_interval_model_requests = 1 +clock_source = "external" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 9d07da066..2745e8ee0 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -11,6 +11,7 @@ mod config_rpc; mod connection_handling_websocket; #[cfg(unix)] mod connection_handling_websocket_unix; +mod current_time; mod dynamic_tools; #[cfg(not(target_os = "windows"))] mod executor_mcp; diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index a1f4b95b5..9745fa878 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -1719,6 +1719,15 @@ async fn handle_server_request( ) .await } + ServerRequest::CurrentTimeRead { request_id, .. } => { + reject_server_request( + client, + request_id, + &method, + "external current time is not supported in exec mode".to_string(), + ) + .await + } ServerRequest::ApplyPatchApproval { request_id, params } => { reject_server_request( client, diff --git a/codex-rs/tui/src/app/app_server_event_targets.rs b/codex-rs/tui/src/app/app_server_event_targets.rs index 90f688f7e..e29d0e9d5 100644 --- a/codex-rs/tui/src/app/app_server_event_targets.rs +++ b/codex-rs/tui/src/app/app_server_event_targets.rs @@ -24,6 +24,9 @@ pub(super) fn server_request_thread_id(request: &ServerRequest) -> Option { ThreadId::from_string(¶ms.thread_id).ok() } + ServerRequest::CurrentTimeRead { params, .. } => { + ThreadId::from_string(¶ms.thread_id).ok() + } ServerRequest::ChatgptAuthTokensRefresh { .. } | ServerRequest::AttestationGenerate { .. } | ServerRequest::ApplyPatchApproval { .. } diff --git a/codex-rs/tui/src/app/app_server_requests.rs b/codex-rs/tui/src/app/app_server_requests.rs index d61e84dce..0b7289768 100644 --- a/codex-rs/tui/src/app/app_server_requests.rs +++ b/codex-rs/tui/src/app/app_server_requests.rs @@ -153,6 +153,12 @@ impl PendingAppServerRequests { message: "Attestation generation is not available in TUI.".to_string(), }) } + ServerRequest::CurrentTimeRead { request_id, .. } => { + Some(UnsupportedAppServerRequest { + request_id: request_id.clone(), + message: "External current time is not available in TUI.".to_string(), + }) + } ServerRequest::ApplyPatchApproval { request_id, .. } => { Some(UnsupportedAppServerRequest { request_id: request_id.clone(), @@ -352,6 +358,7 @@ impl PendingAppServerRequests { ServerRequest::DynamicToolCall { .. } | ServerRequest::ChatgptAuthTokensRefresh { .. } | ServerRequest::AttestationGenerate { .. } + | ServerRequest::CurrentTimeRead { .. } | ServerRequest::ApplyPatchApproval { .. } | ServerRequest::ExecCommandApproval { .. } => true, } diff --git a/codex-rs/tui/src/app/side.rs b/codex-rs/tui/src/app/side.rs index 167fc5c7c..0b69153fa 100644 --- a/codex-rs/tui/src/app/side.rs +++ b/codex-rs/tui/src/app/side.rs @@ -97,6 +97,7 @@ impl SideParentStatus { | ServerRequest::ExecCommandApproval { .. } => Some(SideParentStatus::NeedsApproval), ServerRequest::DynamicToolCall { .. } | ServerRequest::AttestationGenerate { .. } + | ServerRequest::CurrentTimeRead { .. } | ServerRequest::ChatgptAuthTokensRefresh { .. } => None, } } diff --git a/codex-rs/tui/src/chatwidget/protocol_requests.rs b/codex-rs/tui/src/chatwidget/protocol_requests.rs index 2f33c9809..5729c073f 100644 --- a/codex-rs/tui/src/chatwidget/protocol_requests.rs +++ b/codex-rs/tui/src/chatwidget/protocol_requests.rs @@ -46,6 +46,7 @@ impl ChatWidget { } ServerRequest::DynamicToolCall { .. } | ServerRequest::AttestationGenerate { .. } + | ServerRequest::CurrentTimeRead { .. } | ServerRequest::ChatgptAuthTokensRefresh { .. } | ServerRequest::ApplyPatchApproval { .. } | ServerRequest::ExecCommandApproval { .. } => {