mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
Add app-server current-time impl (varlatency 3/n) (#28835)
## What
Server should request:
```
{
"id": 42,
"method": "currentTime/read",
"params": {
"threadId": "11111111-1111-1111-1111-aaaaafdc2c11"
}
}
```
Client should respond with something like:
```rust
{
"id": 42,
"result": {
"currentTimeAt": 1781717655
}
}
```
## Why
Sessions configured with `clock_source = "external"` need a
thread-specific external time source before inference. The system clock
remains the default production provider.
## Validation
- `cargo test -p codex-app-server-protocol`
- `cargo test -p codex-app-server --test all
current_time_read_round_trip_adds_reminder_to_model_input`
- `cargo test -p codex-app-server
first_attestation_capable_connection_for_thread_only_uses_thread_subscribers`
- `cargo test -p codex-analytics`
- `just fix -p codex-app-server-protocol`
- `just fix -p codex-app-server`
Stacked on #28824.
This commit is contained in:
committed by
GitHub
Unverified
parent
0f89dd768c
commit
f4602b7516
@@ -16,4 +16,4 @@ import type { ToolRequestUserInputParams } from "./v2/ToolRequestUserInputParams
|
||||
/**
|
||||
* Request initiated from the server and sent to the client.
|
||||
*/
|
||||
export type ServerRequest = { "method": "item/commandExecution/requestApproval", id: RequestId, params: CommandExecutionRequestApprovalParams, } | { "method": "item/fileChange/requestApproval", id: RequestId, params: FileChangeRequestApprovalParams, } | { "method": "item/tool/requestUserInput", id: RequestId, params: ToolRequestUserInputParams, } | { "method": "mcpServer/elicitation/request", id: RequestId, params: McpServerElicitationRequestParams, } | { "method": "item/permissions/requestApproval", id: RequestId, params: PermissionsRequestApprovalParams, } | { "method": "item/tool/call", id: RequestId, params: DynamicToolCallParams, } | { "method": "account/chatgptAuthTokens/refresh", id: RequestId, params: ChatgptAuthTokensRefreshParams, } | { "method": "attestation/generate", id: RequestId, params: AttestationGenerateParams, } | { "method": "applyPatchApproval", id: RequestId, params: ApplyPatchApprovalParams, } | { "method": "execCommandApproval", id: RequestId, params: ExecCommandApprovalParams, };
|
||||
export type ServerRequest ={ "method": "item/commandExecution/requestApproval", id: RequestId, params: CommandExecutionRequestApprovalParams, } | { "method": "item/fileChange/requestApproval", id: RequestId, params: FileChangeRequestApprovalParams, } | { "method": "item/tool/requestUserInput", id: RequestId, params: ToolRequestUserInputParams, } | { "method": "mcpServer/elicitation/request", id: RequestId, params: McpServerElicitationRequestParams, } | { "method": "item/permissions/requestApproval", id: RequestId, params: PermissionsRequestApprovalParams, } | { "method": "item/tool/call", id: RequestId, params: DynamicToolCallParams, } | { "method": "account/chatgptAuthTokens/refresh", id: RequestId, params: ChatgptAuthTokensRefreshParams, } | { "method": "attestation/generate", id: RequestId, params: AttestationGenerateParams, } | { "method": "applyPatchApproval", id: RequestId, params: ApplyPatchApprovalParams, } | { "method": "execCommandApproval", id: RequestId, params: ExecCommandApprovalParams, };
|
||||
|
||||
@@ -14,6 +14,9 @@ use crate::export_server_responses;
|
||||
use crate::protocol::common::EXPERIMENTAL_CLIENT_METHOD_PARAM_TYPES;
|
||||
use crate::protocol::common::EXPERIMENTAL_CLIENT_METHOD_RESPONSE_TYPES;
|
||||
use crate::protocol::common::EXPERIMENTAL_CLIENT_METHODS;
|
||||
use crate::protocol::common::EXPERIMENTAL_SERVER_METHOD_PARAM_TYPES;
|
||||
use crate::protocol::common::EXPERIMENTAL_SERVER_METHOD_RESPONSE_TYPES;
|
||||
use crate::protocol::common::EXPERIMENTAL_SERVER_METHODS;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
@@ -249,10 +252,10 @@ fn filter_experimental_ts(out_dir: &Path) -> Result<()> {
|
||||
let registered_fields = experimental_fields();
|
||||
let experimental_method_types = experimental_method_types();
|
||||
// Most generated TS files are filtered by schema processing, but
|
||||
// `ClientRequest.ts` and any type with `#[experimental(...)]` fields need
|
||||
// direct post-processing because they encode method/field information in
|
||||
// file-local unions/interfaces.
|
||||
filter_client_request_ts(out_dir, EXPERIMENTAL_CLIENT_METHODS)?;
|
||||
// Request unions and types with `#[experimental(...)]` fields need direct
|
||||
// post-processing because they encode method/field information locally.
|
||||
filter_request_ts(out_dir, "ClientRequest.ts", EXPERIMENTAL_CLIENT_METHODS)?;
|
||||
filter_request_ts(out_dir, "ServerRequest.ts", EXPERIMENTAL_SERVER_METHODS)?;
|
||||
filter_experimental_type_fields_ts(out_dir, ®istered_fields)?;
|
||||
remove_generated_type_files(out_dir, &experimental_method_types, "ts")?;
|
||||
Ok(())
|
||||
@@ -261,10 +264,13 @@ fn filter_experimental_ts(out_dir: &Path) -> Result<()> {
|
||||
pub(crate) fn filter_experimental_ts_tree(tree: &mut BTreeMap<PathBuf, String>) -> Result<()> {
|
||||
let registered_fields = experimental_fields();
|
||||
let experimental_method_types = experimental_method_types();
|
||||
if let Some(content) = tree.get_mut(Path::new("ClientRequest.ts")) {
|
||||
let filtered =
|
||||
filter_client_request_ts_contents(std::mem::take(content), EXPERIMENTAL_CLIENT_METHODS);
|
||||
*content = filtered;
|
||||
for (file_name, experimental_methods) in [
|
||||
("ClientRequest.ts", EXPERIMENTAL_CLIENT_METHODS),
|
||||
("ServerRequest.ts", EXPERIMENTAL_SERVER_METHODS),
|
||||
] {
|
||||
if let Some(content) = tree.get_mut(Path::new(file_name)) {
|
||||
*content = filter_request_ts_contents(std::mem::take(content), experimental_methods);
|
||||
}
|
||||
}
|
||||
|
||||
let mut fields_by_type_name: HashMap<String, HashSet<String>> = HashMap::new();
|
||||
@@ -293,21 +299,21 @@ pub(crate) fn filter_experimental_ts_tree(tree: &mut BTreeMap<PathBuf, String>)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes union arms from `ClientRequest.ts` for methods marked experimental.
|
||||
fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Result<()> {
|
||||
let path = out_dir.join("ClientRequest.ts");
|
||||
/// Removes union arms from a generated request type for methods marked experimental.
|
||||
fn filter_request_ts(out_dir: &Path, file_name: &str, experimental_methods: &[&str]) -> Result<()> {
|
||||
let path = out_dir.join(file_name);
|
||||
if !path.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
let mut content =
|
||||
fs::read_to_string(&path).with_context(|| format!("Failed to read {}", path.display()))?;
|
||||
content = filter_client_request_ts_contents(content, experimental_methods);
|
||||
content = filter_request_ts_contents(content, experimental_methods);
|
||||
|
||||
fs::write(&path, content).with_context(|| format!("Failed to write {}", path.display()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn filter_client_request_ts_contents(mut content: String, experimental_methods: &[&str]) -> String {
|
||||
fn filter_request_ts_contents(mut content: String, experimental_methods: &[&str]) -> String {
|
||||
let Some((prefix, body, suffix)) = split_type_alias(&content) else {
|
||||
return content;
|
||||
};
|
||||
@@ -404,6 +410,7 @@ fn filter_experimental_schema(bundle: &mut Value) -> Result<()> {
|
||||
filter_experimental_fields_in_root(bundle, ®istered_fields);
|
||||
filter_experimental_fields_in_definitions(bundle, ®istered_fields);
|
||||
prune_experimental_methods(bundle, EXPERIMENTAL_CLIENT_METHODS);
|
||||
prune_experimental_methods(bundle, EXPERIMENTAL_SERVER_METHODS);
|
||||
remove_experimental_method_type_definitions(bundle);
|
||||
Ok(())
|
||||
}
|
||||
@@ -560,6 +567,8 @@ fn experimental_method_types() -> HashSet<String> {
|
||||
collect_experimental_type_names(EXPERIMENTAL_CLIENT_METHOD_PARAM_TYPES, &mut type_names);
|
||||
collect_experimental_type_names(EXPERIMENTAL_CLIENT_METHOD_RESPONSE_TYPES, &mut type_names);
|
||||
collect_experimental_type_names(EXPERIMENTAL_CLIENT_METHOD_DEPENDENCY_TYPES, &mut type_names);
|
||||
collect_experimental_type_names(EXPERIMENTAL_SERVER_METHOD_PARAM_TYPES, &mut type_names);
|
||||
collect_experimental_type_names(EXPERIMENTAL_SERVER_METHOD_RESPONSE_TYPES, &mut type_names);
|
||||
type_names
|
||||
}
|
||||
|
||||
@@ -2118,6 +2127,13 @@ mod tests {
|
||||
client_request_ts.contains("MockExperimentalMethodParams"),
|
||||
false
|
||||
);
|
||||
let server_request_ts = std::str::from_utf8(
|
||||
fixture_tree
|
||||
.get(Path::new("ServerRequest.ts"))
|
||||
.ok_or_else(|| anyhow::anyhow!("missing ServerRequest.ts fixture"))?,
|
||||
)?;
|
||||
assert_eq!(server_request_ts.contains("currentTime/read"), false);
|
||||
assert_eq!(server_request_ts.contains("CurrentTimeReadParams"), false);
|
||||
let typescript_index = std::str::from_utf8(
|
||||
fixture_tree
|
||||
.get(Path::new("index.ts"))
|
||||
@@ -2138,6 +2154,14 @@ mod tests {
|
||||
fixture_tree.contains_key(Path::new("v2/MockExperimentalMethodResponse.ts")),
|
||||
false
|
||||
);
|
||||
assert_eq!(
|
||||
fixture_tree.contains_key(Path::new("v2/CurrentTimeReadParams.ts")),
|
||||
false
|
||||
);
|
||||
assert_eq!(
|
||||
fixture_tree.contains_key(Path::new("v2/CurrentTimeReadResponse.ts")),
|
||||
false
|
||||
);
|
||||
assert_eq!(
|
||||
fixture_tree.contains_key(Path::new("v2/RemoteControlClient.ts")),
|
||||
false
|
||||
|
||||
@@ -1184,7 +1184,8 @@ client_request_definitions! {
|
||||
macro_rules! server_request_definitions {
|
||||
(
|
||||
$(
|
||||
$(#[$variant_meta:meta])*
|
||||
$(#[experimental($reason:expr)])?
|
||||
$(#[doc = $variant_doc:literal])*
|
||||
$variant:ident $(=> $wire:literal)? {
|
||||
params: $params:ty,
|
||||
response: $response:ty,
|
||||
@@ -1197,7 +1198,7 @@ macro_rules! server_request_definitions {
|
||||
#[serde(tag = "method", rename_all = "camelCase")]
|
||||
pub enum ServerRequest {
|
||||
$(
|
||||
$(#[$variant_meta])*
|
||||
$(#[doc = $variant_doc])*
|
||||
$(#[serde(rename = $wire)] #[ts(rename = $wire)])?
|
||||
$variant {
|
||||
#[serde(rename = "id")]
|
||||
@@ -1237,7 +1238,7 @@ macro_rules! server_request_definitions {
|
||||
#[serde(tag = "method", rename_all = "camelCase")]
|
||||
pub enum ServerResponse {
|
||||
$(
|
||||
$(#[$variant_meta])*
|
||||
$(#[doc = $variant_doc])*
|
||||
$(#[serde(rename = $wire)])?
|
||||
$variant {
|
||||
#[serde(rename = "id")]
|
||||
@@ -1281,6 +1282,22 @@ macro_rules! server_request_definitions {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) const EXPERIMENTAL_SERVER_METHODS: &[&str] = &[
|
||||
$(
|
||||
experimental_method_entry!($(#[experimental($reason)])? $(=> $wire)?),
|
||||
)*
|
||||
];
|
||||
pub(crate) const EXPERIMENTAL_SERVER_METHOD_PARAM_TYPES: &[&str] = &[
|
||||
$(
|
||||
experimental_type_entry!($(#[experimental($reason)])? $params),
|
||||
)*
|
||||
];
|
||||
pub(crate) const EXPERIMENTAL_SERVER_METHOD_RESPONSE_TYPES: &[&str] = &[
|
||||
$(
|
||||
experimental_type_entry!($(#[experimental($reason)])? $response),
|
||||
)*
|
||||
];
|
||||
|
||||
pub fn export_server_responses(
|
||||
out_dir: &::std::path::Path,
|
||||
) -> ::std::result::Result<(), ::ts_rs::ExportError> {
|
||||
@@ -1470,6 +1487,13 @@ server_request_definitions! {
|
||||
response: v2::AttestationGenerateResponse,
|
||||
},
|
||||
|
||||
#[experimental("currentTime/read")]
|
||||
/// Read the current time from an external clock owned by the client.
|
||||
CurrentTimeRead => "currentTime/read" {
|
||||
params: v2::CurrentTimeReadParams,
|
||||
response: v2::CurrentTimeReadResponse,
|
||||
},
|
||||
|
||||
/// DEPRECATED APIs below
|
||||
/// Request to approve a patch.
|
||||
/// This request is used for Turns started via the legacy APIs (i.e. SendUserTurn, SendUserMessage).
|
||||
@@ -2372,6 +2396,32 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_current_time_read_request() -> Result<()> {
|
||||
let params = v2::CurrentTimeReadParams {
|
||||
thread_id: "thread-123".to_string(),
|
||||
};
|
||||
let request = ServerRequest::CurrentTimeRead {
|
||||
request_id: RequestId::Integer(10),
|
||||
params: params.clone(),
|
||||
};
|
||||
assert_eq!(
|
||||
json!({
|
||||
"method": "currentTime/read",
|
||||
"id": 10,
|
||||
"params": {
|
||||
"threadId": "thread-123"
|
||||
}
|
||||
}),
|
||||
serde_json::to_value(&request)?,
|
||||
);
|
||||
|
||||
let payload = ServerRequestPayload::CurrentTimeRead(params);
|
||||
assert_eq!(request.id(), &RequestId::Integer(10));
|
||||
assert_eq!(payload.request_with_id(RequestId::Integer(10)), request);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_server_response() -> Result<()> {
|
||||
let response = ServerResponse::CommandExecutionRequestApproval {
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
use schemars::JsonSchema;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use ts_rs::TS;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct CurrentTimeReadParams {
|
||||
pub thread_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct CurrentTimeReadResponse {
|
||||
/// Current time as whole Unix seconds.
|
||||
#[ts(type = "number")]
|
||||
pub current_time_at: i64,
|
||||
}
|
||||
@@ -6,6 +6,7 @@ mod attestation;
|
||||
mod collaboration_mode;
|
||||
mod command_exec;
|
||||
mod config;
|
||||
mod current_time;
|
||||
mod environment;
|
||||
mod experimental_feature;
|
||||
mod feedback;
|
||||
@@ -32,6 +33,7 @@ pub use attestation::*;
|
||||
pub use collaboration_mode::*;
|
||||
pub use command_exec::*;
|
||||
pub use config::*;
|
||||
pub use current_time::*;
|
||||
pub use environment::*;
|
||||
pub use experimental_feature::*;
|
||||
pub use feedback::*;
|
||||
|
||||
@@ -1469,6 +1469,10 @@ When the client responds to `item/tool/requestUserInput`, the server emits `serv
|
||||
|
||||
Desktop hosts that provide upstream attestation should set `capabilities.requestAttestation` during `initialize` and handle the server-initiated `attestation/generate` request. App-server issues it just in time before ChatGPT Codex requests that forward `x-oai-attestation`; the client responds with `{ "token": "v1.<opaque>" }`, where `token` is an opaque client-owned value. When app-server receives a client response, it forwards a consistent outer envelope such as `{ "v": 1, "s": 0, "t": "v1.<opaque>" }`, where `t` contains the client token unchanged. If app-server attempts attestation but fails within its own boundary, it sends the same envelope shape with an app-server status code and without `t` (`1 = timeout`, `2 = request failed`, `3 = request canceled`, `4 = malformed response`). If no initialized client opted into attestation, app-server omits `x-oai-attestation` for that upstream request.
|
||||
|
||||
### Current time
|
||||
|
||||
When `[features.current_time_reminder]` is enabled with `clock_source = "external"`, app-server sends the client subscribed to the thread an experimental `currentTime/read` request with `{ "threadId": "thr_123" }` when a time reminder is due. The client responds with `{ "currentTimeAt": 1781717655 }`, where `currentTimeAt` is an integer Unix timestamp in seconds. A failed, canceled, timed-out, or malformed response stops the turn before the model request is sent.
|
||||
|
||||
### MCP server elicitations
|
||||
|
||||
MCP servers can interrupt a turn and ask the client for structured input via `mcpServer/elicitation/request`.
|
||||
|
||||
@@ -0,0 +1,146 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::Weak;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::bail;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_app_server_protocol::CurrentTimeReadParams;
|
||||
use codex_app_server_protocol::CurrentTimeReadResponse;
|
||||
use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_core::TimeFuture;
|
||||
use codex_core::TimeProvider;
|
||||
use codex_protocol::ThreadId;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::timeout_at;
|
||||
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::thread_state::ThreadStateManager;
|
||||
|
||||
const CURRENT_TIME_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
pub(crate) fn app_server_time_provider(
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
thread_state_manager: ThreadStateManager,
|
||||
) -> Arc<dyn TimeProvider> {
|
||||
Arc::new(AppServerTimeProvider {
|
||||
outgoing: Arc::downgrade(&outgoing),
|
||||
thread_state_manager,
|
||||
})
|
||||
}
|
||||
|
||||
struct AppServerTimeProvider {
|
||||
outgoing: Weak<OutgoingMessageSender>,
|
||||
thread_state_manager: ThreadStateManager,
|
||||
}
|
||||
|
||||
impl TimeProvider for AppServerTimeProvider {
|
||||
fn current_time(&self, thread_id: ThreadId) -> TimeFuture<'_> {
|
||||
let outgoing = self.outgoing.clone();
|
||||
let thread_state_manager = self.thread_state_manager.clone();
|
||||
Box::pin(async move {
|
||||
let outgoing = outgoing
|
||||
.upgrade()
|
||||
.context("app-server current-time provider is unavailable")?;
|
||||
request_current_time(outgoing, thread_state_manager, thread_id).await
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_current_time(
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
thread_state_manager: ThreadStateManager,
|
||||
thread_id: ThreadId,
|
||||
) -> Result<DateTime<Utc>> {
|
||||
let deadline = Instant::now() + CURRENT_TIME_REQUEST_TIMEOUT;
|
||||
timeout_at(
|
||||
deadline,
|
||||
thread_state_manager.wait_for_thread_subscriber(thread_id),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
anyhow!(
|
||||
"timed out waiting for a client to subscribe to the thread after {}s",
|
||||
CURRENT_TIME_REQUEST_TIMEOUT.as_secs()
|
||||
)
|
||||
})?;
|
||||
let connection_ids = thread_state_manager
|
||||
.subscribed_connection_ids(thread_id)
|
||||
.await;
|
||||
let connection_id = require_single_current_time_connection(&connection_ids)?;
|
||||
let connection_ids = [connection_id];
|
||||
let (request_id, rx) = outgoing
|
||||
.send_request_to_connections(
|
||||
Some(&connection_ids),
|
||||
ServerRequestPayload::CurrentTimeRead(CurrentTimeReadParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
}),
|
||||
/*thread_id*/ None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let result = match timeout_at(deadline, rx).await {
|
||||
Ok(Ok(Ok(result))) => result,
|
||||
Ok(Ok(Err(err))) => {
|
||||
bail!(
|
||||
"current-time request failed: code={} message={}",
|
||||
err.code,
|
||||
err.message
|
||||
);
|
||||
}
|
||||
Ok(Err(err)) => bail!("current-time request was canceled: {err}"),
|
||||
Err(_) => {
|
||||
let _canceled = outgoing.cancel_request(&request_id).await;
|
||||
bail!(
|
||||
"current-time request timed out after {}s",
|
||||
CURRENT_TIME_REQUEST_TIMEOUT.as_secs()
|
||||
);
|
||||
}
|
||||
};
|
||||
let response: CurrentTimeReadResponse =
|
||||
serde_json::from_value(result).context("invalid current-time response")?;
|
||||
|
||||
DateTime::from_timestamp(response.current_time_at, 0)
|
||||
.ok_or_else(|| anyhow!("current-time response is outside the supported range"))
|
||||
}
|
||||
|
||||
fn require_single_current_time_connection(connection_ids: &[ConnectionId]) -> Result<ConnectionId> {
|
||||
// External clocks are not interchangeable, so do not choose one silently.
|
||||
match connection_ids {
|
||||
[connection_id] => Ok(*connection_id),
|
||||
_ => bail!(
|
||||
"expected exactly one client subscribed to the thread, found {}",
|
||||
connection_ids.len()
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::require_single_current_time_connection;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
|
||||
#[test]
|
||||
fn current_time_connection_must_be_unambiguous() {
|
||||
assert_eq!(
|
||||
require_single_current_time_connection(&[ConnectionId(7)]).unwrap(),
|
||||
ConnectionId(7)
|
||||
);
|
||||
assert_eq!(
|
||||
require_single_current_time_connection(&[])
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"expected exactly one client subscribed to the thread, found 0"
|
||||
);
|
||||
assert_eq!(
|
||||
require_single_current_time_connection(&[ConnectionId(7), ConnectionId(8)])
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"expected exactly one client subscribed to the thread, found 2"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -91,6 +91,7 @@ mod config_manager;
|
||||
mod config_manager_service;
|
||||
mod connection_cleanup;
|
||||
mod connection_rpc_gate;
|
||||
mod current_time;
|
||||
mod dynamic_tools;
|
||||
mod error_code;
|
||||
mod extensions;
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use crate::attestation::app_server_attestation_provider;
|
||||
use crate::config_manager::ConfigManager;
|
||||
use crate::connection_rpc_gate::ConnectionRpcGate;
|
||||
use crate::current_time::app_server_time_provider;
|
||||
use crate::error_code::invalid_request;
|
||||
use crate::extensions::ThreadExtensionDependencies;
|
||||
use crate::extensions::app_server_extension_event_sink;
|
||||
@@ -280,7 +281,6 @@ impl ConnectionSessionState {
|
||||
.get()
|
||||
.is_some_and(|session| session.supports_openai_form_elicitation)
|
||||
}
|
||||
|
||||
pub(crate) fn initialize(&self, session: InitializedConnectionSessionState) -> Result<(), ()> {
|
||||
self.initialized.set(session).map_err(|_| ())
|
||||
}
|
||||
@@ -379,7 +379,10 @@ impl MessageProcessor {
|
||||
outgoing.clone(),
|
||||
thread_state_manager.clone(),
|
||||
)),
|
||||
/*external_time_provider*/ None,
|
||||
Some(app_server_time_provider(
|
||||
outgoing.clone(),
|
||||
thread_state_manager.clone(),
|
||||
)),
|
||||
)
|
||||
});
|
||||
let models_manager = thread_manager.get_models_manager();
|
||||
|
||||
@@ -1381,6 +1381,31 @@ mod thread_processor_behavior_tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_for_thread_subscriber_unblocks_after_connection_attaches() -> Result<()> {
|
||||
let manager = ThreadStateManager::new();
|
||||
let thread_id = ThreadId::from_string("ba62fd70-2ec2-4b1b-9d94-355694332dd2")?;
|
||||
let connection = ConnectionId(1);
|
||||
manager
|
||||
.connection_initialized(connection, ConnectionCapabilities::default())
|
||||
.await;
|
||||
|
||||
let wait_for_subscriber = manager.wait_for_thread_subscriber(thread_id);
|
||||
let attach_connection = async {
|
||||
tokio::task::yield_now().await;
|
||||
manager
|
||||
.try_add_connection_to_thread(thread_id, connection)
|
||||
.await
|
||||
};
|
||||
let ((), attached) = tokio::time::timeout(Duration::from_secs(1), async {
|
||||
tokio::join!(wait_for_subscriber, attach_connection)
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert!(attached);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn closed_connection_cannot_be_reintroduced_by_auto_subscribe() -> Result<()> {
|
||||
let manager = ThreadStateManager::new();
|
||||
|
||||
@@ -329,6 +329,23 @@ impl ThreadStateManager {
|
||||
.min_by_key(|connection_id| connection_id.0)
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_thread_subscriber(&self, thread_id: ThreadId) {
|
||||
let mut has_connections = {
|
||||
let mut state = self.state.lock().await;
|
||||
state
|
||||
.threads
|
||||
.entry(thread_id)
|
||||
.or_default()
|
||||
.has_connections_watcher
|
||||
.subscribe()
|
||||
};
|
||||
while !*has_connections.borrow_and_update() {
|
||||
if has_connections.changed().await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn subscribed_connection_ids(&self, thread_id: ThreadId) -> Vec<ConnectionId> {
|
||||
let state = self.state.lock().await;
|
||||
state
|
||||
|
||||
@@ -0,0 +1,130 @@
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Result;
|
||||
use app_test_support::TestAppServer;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::CurrentTimeReadResponse;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
|
||||
#[cfg(windows)]
|
||||
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(25);
|
||||
#[cfg(not(windows))]
|
||||
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const CURRENT_TIME_AT: i64 = 1_781_717_655;
|
||||
const CURRENT_TIME_REMINDER: &str = "It is 2026-06-17 17:34:15 UTC.";
|
||||
|
||||
#[tokio::test]
|
||||
async fn current_time_read_round_trip_adds_reminder_to_model_input() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let response_mock = responses::mount_sse_once(
|
||||
&server,
|
||||
create_final_assistant_message_sse_response("Done")?,
|
||||
)
|
||||
.await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut app_server = TestAppServer::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, app_server.initialize()).await??;
|
||||
|
||||
let thread_request_id = app_server
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
let thread_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
app_server.read_stream_until_response_message(RequestId::Integer(thread_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response(thread_response)?;
|
||||
|
||||
let turn_request_id = app_server
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "What time is it?".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
app_server.read_stream_until_response_message(RequestId::Integer(turn_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: TurnStartResponse = to_response(turn_response)?;
|
||||
|
||||
let server_request = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
app_server.read_stream_until_request_message(),
|
||||
)
|
||||
.await??;
|
||||
let ServerRequest::CurrentTimeRead { request_id, params } = server_request else {
|
||||
panic!("expected CurrentTimeRead request, got: {server_request:?}");
|
||||
};
|
||||
assert_eq!(params.thread_id, thread.id);
|
||||
app_server
|
||||
.send_response(
|
||||
request_id,
|
||||
serde_json::to_value(CurrentTimeReadResponse {
|
||||
current_time_at: CURRENT_TIME_AT,
|
||||
})?,
|
||||
)
|
||||
.await?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
app_server.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
response_mock
|
||||
.single_request()
|
||||
.message_input_texts("developer")
|
||||
.iter()
|
||||
.any(|text| text == CURRENT_TIME_REMINDER)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
std::fs::write(
|
||||
codex_home.join("config.toml"),
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[features.current_time_reminder]
|
||||
enabled = true
|
||||
reminder_interval_model_requests = 1
|
||||
clock_source = "external"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
@@ -11,6 +11,7 @@ mod config_rpc;
|
||||
mod connection_handling_websocket;
|
||||
#[cfg(unix)]
|
||||
mod connection_handling_websocket_unix;
|
||||
mod current_time;
|
||||
mod dynamic_tools;
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
mod executor_mcp;
|
||||
|
||||
@@ -1719,6 +1719,15 @@ async fn handle_server_request(
|
||||
)
|
||||
.await
|
||||
}
|
||||
ServerRequest::CurrentTimeRead { request_id, .. } => {
|
||||
reject_server_request(
|
||||
client,
|
||||
request_id,
|
||||
&method,
|
||||
"external current time is not supported in exec mode".to_string(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
ServerRequest::ApplyPatchApproval { request_id, params } => {
|
||||
reject_server_request(
|
||||
client,
|
||||
|
||||
@@ -24,6 +24,9 @@ pub(super) fn server_request_thread_id(request: &ServerRequest) -> Option<Thread
|
||||
ServerRequest::DynamicToolCall { params, .. } => {
|
||||
ThreadId::from_string(¶ms.thread_id).ok()
|
||||
}
|
||||
ServerRequest::CurrentTimeRead { params, .. } => {
|
||||
ThreadId::from_string(¶ms.thread_id).ok()
|
||||
}
|
||||
ServerRequest::ChatgptAuthTokensRefresh { .. }
|
||||
| ServerRequest::AttestationGenerate { .. }
|
||||
| ServerRequest::ApplyPatchApproval { .. }
|
||||
|
||||
@@ -153,6 +153,12 @@ impl PendingAppServerRequests {
|
||||
message: "Attestation generation is not available in TUI.".to_string(),
|
||||
})
|
||||
}
|
||||
ServerRequest::CurrentTimeRead { request_id, .. } => {
|
||||
Some(UnsupportedAppServerRequest {
|
||||
request_id: request_id.clone(),
|
||||
message: "External current time is not available in TUI.".to_string(),
|
||||
})
|
||||
}
|
||||
ServerRequest::ApplyPatchApproval { request_id, .. } => {
|
||||
Some(UnsupportedAppServerRequest {
|
||||
request_id: request_id.clone(),
|
||||
@@ -352,6 +358,7 @@ impl PendingAppServerRequests {
|
||||
ServerRequest::DynamicToolCall { .. }
|
||||
| ServerRequest::ChatgptAuthTokensRefresh { .. }
|
||||
| ServerRequest::AttestationGenerate { .. }
|
||||
| ServerRequest::CurrentTimeRead { .. }
|
||||
| ServerRequest::ApplyPatchApproval { .. }
|
||||
| ServerRequest::ExecCommandApproval { .. } => true,
|
||||
}
|
||||
|
||||
@@ -97,6 +97,7 @@ impl SideParentStatus {
|
||||
| ServerRequest::ExecCommandApproval { .. } => Some(SideParentStatus::NeedsApproval),
|
||||
ServerRequest::DynamicToolCall { .. }
|
||||
| ServerRequest::AttestationGenerate { .. }
|
||||
| ServerRequest::CurrentTimeRead { .. }
|
||||
| ServerRequest::ChatgptAuthTokensRefresh { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,7 @@ impl ChatWidget {
|
||||
}
|
||||
ServerRequest::DynamicToolCall { .. }
|
||||
| ServerRequest::AttestationGenerate { .. }
|
||||
| ServerRequest::CurrentTimeRead { .. }
|
||||
| ServerRequest::ChatgptAuthTokensRefresh { .. }
|
||||
| ServerRequest::ApplyPatchApproval { .. }
|
||||
| ServerRequest::ExecCommandApproval { .. } => {
|
||||
|
||||
Reference in New Issue
Block a user