mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
Add support for UDS in codex --remote (#22414)
## Why Added support for UDS connections in `codex --remote`. TUI also now connects to local app-server using UDS by default if it is running and set to listen to UDS connection. ## What Changed - Introduced `RemoteAppServerEndpoint` with `WebSocket` and `UnixSocket` variants. - Reused the existing JSON-RPC-over-WebSocket protocol over either a TCP WebSocket stream or a UDS stream. - Updated `codex --remote` to accept `ws://host:port`, `wss://host:port`, `unix://`, and `unix://PATH`. - Kept `--remote-auth-token-env` restricted to `wss://` and loopback `ws://` remotes. - Added a fast TUI startup probe for the default daemon socket, falling back to the embedded app server when the daemon is absent or unresponsive. ## Verification - Manually verified that the updated remote flow works. - Added coverage for UDS remote round trips, WebSocket auth headers, auth-token transport policy, remote address parsing, and missing-daemon fallback. - Ran focused remote test coverage locally.
This commit is contained in:
committed by
GitHub
Unverified
parent
7bf95b39aa
commit
ad572709ab
Generated
+2
@@ -1970,6 +1970,8 @@ dependencies = [
|
||||
"codex-exec-server",
|
||||
"codex-feedback",
|
||||
"codex-protocol",
|
||||
"codex-uds",
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-rustls-provider",
|
||||
"futures",
|
||||
"pretty_assertions",
|
||||
|
||||
@@ -21,6 +21,8 @@ codex-core = { workspace = true }
|
||||
codex-exec-server = { workspace = true }
|
||||
codex-feedback = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-uds = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-rustls-provider = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
||||
@@ -25,6 +25,7 @@ use std::io::Result as IoResult;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub use codex_app_server::app_server_control_socket_path;
|
||||
pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
||||
pub use codex_app_server::in_process::InProcessServerEvent;
|
||||
use codex_app_server::in_process::InProcessStartArgs;
|
||||
@@ -61,6 +62,7 @@ use tracing::warn;
|
||||
|
||||
pub use crate::remote::RemoteAppServerClient;
|
||||
pub use crate::remote::RemoteAppServerConnectArgs;
|
||||
pub use crate::remote::RemoteAppServerEndpoint;
|
||||
|
||||
/// Transitional access to core-only embedded app-server types.
|
||||
///
|
||||
@@ -952,6 +954,8 @@ mod tests {
|
||||
use codex_app_server_protocol::ToolRequestUserInputQuestion;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use codex_core::init_state_db;
|
||||
use codex_uds::UnixListener;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -961,6 +965,7 @@ mod tests {
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::accept_async;
|
||||
use tokio_tungstenite::accept_hdr_async;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tokio_tungstenite::tungstenite::handshake::server::Request as WebSocketRequest;
|
||||
@@ -1100,9 +1105,10 @@ mod tests {
|
||||
format!("ws://{addr}")
|
||||
}
|
||||
|
||||
async fn expect_remote_initialize(
|
||||
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
||||
) {
|
||||
async fn expect_remote_initialize<S>(websocket: &mut tokio_tungstenite::WebSocketStream<S>)
|
||||
where
|
||||
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
|
||||
{
|
||||
let JSONRPCMessage::Request(request) = read_websocket_message(websocket).await else {
|
||||
panic!("expected initialize request");
|
||||
};
|
||||
@@ -1123,9 +1129,12 @@ mod tests {
|
||||
assert_eq!(notification.method, "initialized");
|
||||
}
|
||||
|
||||
async fn read_websocket_message(
|
||||
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
||||
) -> JSONRPCMessage {
|
||||
async fn read_websocket_message<S>(
|
||||
websocket: &mut tokio_tungstenite::WebSocketStream<S>,
|
||||
) -> JSONRPCMessage
|
||||
where
|
||||
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
|
||||
{
|
||||
loop {
|
||||
let frame = websocket
|
||||
.next()
|
||||
@@ -1145,10 +1154,12 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_websocket_message(
|
||||
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
||||
async fn write_websocket_message<S>(
|
||||
websocket: &mut tokio_tungstenite::WebSocketStream<S>,
|
||||
message: JSONRPCMessage,
|
||||
) {
|
||||
) where
|
||||
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
|
||||
{
|
||||
websocket
|
||||
.send(Message::Text(
|
||||
serde_json::to_string(&message)
|
||||
@@ -1213,8 +1224,10 @@ mod tests {
|
||||
|
||||
fn test_remote_connect_args(websocket_url: String) -> RemoteAppServerConnectArgs {
|
||||
RemoteAppServerConnectArgs {
|
||||
websocket_url,
|
||||
auth_token: None,
|
||||
endpoint: RemoteAppServerEndpoint::WebSocket {
|
||||
websocket_url,
|
||||
auth_token: None,
|
||||
},
|
||||
client_name: "codex-app-server-client-test".to_string(),
|
||||
client_version: "0.0.0-test".to_string(),
|
||||
experimental_api: true,
|
||||
@@ -1453,6 +1466,64 @@ mod tests {
|
||||
client.shutdown().await.expect("shutdown should complete");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_unix_socket_typed_request_roundtrip_works() {
|
||||
let socket_dir = TempDir::new().expect("socket dir");
|
||||
let socket_path = AbsolutePathBuf::from_absolute_path(socket_dir.path().join("codex.sock"))
|
||||
.expect("socket path should resolve");
|
||||
let mut listener = UnixListener::bind(socket_path.as_path())
|
||||
.await
|
||||
.expect("listener should bind");
|
||||
tokio::spawn(async move {
|
||||
let stream = listener.accept().await.expect("accept should succeed");
|
||||
let mut websocket = accept_async(stream)
|
||||
.await
|
||||
.expect("websocket upgrade should succeed");
|
||||
expect_remote_initialize(&mut websocket).await;
|
||||
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
|
||||
else {
|
||||
panic!("expected account/read request");
|
||||
};
|
||||
assert_eq!(request.method, "account/read");
|
||||
write_websocket_message(
|
||||
&mut websocket,
|
||||
JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: request.id,
|
||||
result: serde_json::to_value(GetAccountResponse {
|
||||
account: None,
|
||||
requires_openai_auth: false,
|
||||
})
|
||||
.expect("response should serialize"),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
websocket.close(None).await.expect("close should succeed");
|
||||
});
|
||||
let client = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
||||
endpoint: RemoteAppServerEndpoint::UnixSocket { socket_path },
|
||||
client_name: "codex-app-server-client-test".to_string(),
|
||||
client_version: "0.0.0-test".to_string(),
|
||||
experimental_api: true,
|
||||
opt_out_notification_methods: Vec::new(),
|
||||
channel_capacity: 8,
|
||||
})
|
||||
.await
|
||||
.expect("remote client should connect");
|
||||
|
||||
let response: GetAccountResponse = client
|
||||
.request_typed(ClientRequest::GetAccount {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: codex_app_server_protocol::GetAccountParams {
|
||||
refresh_token: false,
|
||||
},
|
||||
})
|
||||
.await
|
||||
.expect("typed request should succeed");
|
||||
assert_eq!(response.account, None);
|
||||
|
||||
client.shutdown().await.expect("shutdown should complete");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_typed_request_accepts_large_single_frame_response() {
|
||||
let padding = "x".repeat((17 << 20) + 1024);
|
||||
@@ -1514,8 +1585,15 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
let client = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
||||
auth_token: Some(auth_token),
|
||||
..test_remote_connect_args(websocket_url)
|
||||
endpoint: RemoteAppServerEndpoint::WebSocket {
|
||||
websocket_url,
|
||||
auth_token: Some(auth_token),
|
||||
},
|
||||
client_name: "codex-app-server-client-test".to_string(),
|
||||
client_version: "0.0.0-test".to_string(),
|
||||
experimental_api: true,
|
||||
opt_out_notification_methods: Vec::new(),
|
||||
channel_capacity: 8,
|
||||
})
|
||||
.await
|
||||
.expect("remote client should connect");
|
||||
@@ -1526,9 +1604,15 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn remote_connect_rejects_non_loopback_ws_when_auth_configured() {
|
||||
let result = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
||||
websocket_url: "ws://example.com:4500".to_string(),
|
||||
auth_token: Some("remote-bearer-token".to_string()),
|
||||
..test_remote_connect_args("ws://127.0.0.1:1".to_string())
|
||||
endpoint: RemoteAppServerEndpoint::WebSocket {
|
||||
websocket_url: "ws://example.com:4500".to_string(),
|
||||
auth_token: Some("remote-bearer-token".to_string()),
|
||||
},
|
||||
client_name: "codex-app-server-client-test".to_string(),
|
||||
client_version: "0.0.0-test".to_string(),
|
||||
experimental_api: true,
|
||||
opt_out_notification_methods: Vec::new(),
|
||||
channel_capacity: 8,
|
||||
})
|
||||
.await;
|
||||
let err = match result {
|
||||
@@ -1706,13 +1790,8 @@ mod tests {
|
||||
})
|
||||
.await;
|
||||
let mut client = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
||||
websocket_url,
|
||||
auth_token: None,
|
||||
client_name: "codex-app-server-client-test".to_string(),
|
||||
client_version: "0.0.0-test".to_string(),
|
||||
experimental_api: true,
|
||||
opt_out_notification_methods: Vec::new(),
|
||||
channel_capacity: 1,
|
||||
..test_remote_connect_args(websocket_url)
|
||||
})
|
||||
.await
|
||||
.expect("remote client should connect");
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
/*
|
||||
This module implements the websocket-backed app-server client transport.
|
||||
This module implements the remote app-server client transport.
|
||||
|
||||
It owns the remote connection lifecycle, including the initialize/initialized
|
||||
handshake, JSON-RPC request/response routing, server-request resolution, and
|
||||
notification streaming. The rest of the crate uses the same `AppServerEvent`
|
||||
surface for both in-process and remote transports, so callers such as the TUI
|
||||
can switch between them without changing their higher-level session logic.
|
||||
notification streaming. Remote connections always carry WebSocket frames, over
|
||||
either TCP WebSocket URLs or local Unix sockets. The rest of the crate uses the
|
||||
same `AppServerEvent` surface for both in-process and remote transports, so
|
||||
callers such as the TUI can switch between them without changing their
|
||||
higher-level session logic.
|
||||
*/
|
||||
|
||||
use std::collections::HashMap;
|
||||
@@ -35,17 +37,23 @@ use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result as JsonRpcResult;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_uds::UnixStream;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::MaybeTlsStream;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::client_async_with_config;
|
||||
use tokio_tungstenite::connect_async_with_config;
|
||||
use tokio_tungstenite::tungstenite::Error as TungsteniteError;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
||||
use tokio_tungstenite::tungstenite::http::HeaderValue;
|
||||
@@ -57,18 +65,30 @@ use url::Url;
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const REMOTE_APP_SERVER_MAX_WEBSOCKET_MESSAGE_SIZE: usize = 128 << 20;
|
||||
// Tungstenite still needs an HTTP request URI for the WebSocket handshake;
|
||||
// the bytes travel over the Unix socket, not TCP.
|
||||
const UDS_WEBSOCKET_HANDSHAKE_URL: &str = "ws://localhost/rpc";
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum RemoteAppServerEndpoint {
|
||||
WebSocket {
|
||||
websocket_url: String,
|
||||
auth_token: Option<String>,
|
||||
},
|
||||
UnixSocket {
|
||||
socket_path: AbsolutePathBuf,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RemoteAppServerConnectArgs {
|
||||
pub websocket_url: String,
|
||||
pub auth_token: Option<String>,
|
||||
pub endpoint: RemoteAppServerEndpoint,
|
||||
pub client_name: String,
|
||||
pub client_version: String,
|
||||
pub experimental_api: bool,
|
||||
pub opt_out_notification_methods: Vec<String>,
|
||||
pub channel_capacity: usize,
|
||||
}
|
||||
|
||||
impl RemoteAppServerConnectArgs {
|
||||
fn initialize_params(&self) -> InitializeParams {
|
||||
let capabilities = InitializeCapabilities {
|
||||
@@ -141,69 +161,39 @@ pub struct RemoteAppServerRequestHandle {
|
||||
impl RemoteAppServerClient {
|
||||
pub async fn connect(args: RemoteAppServerConnectArgs) -> IoResult<Self> {
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let websocket_url = args.websocket_url.clone();
|
||||
let url = Url::parse(&websocket_url).map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid websocket URL `{websocket_url}`: {err}"),
|
||||
)
|
||||
})?;
|
||||
if args.auth_token.is_some() && !websocket_url_supports_auth_token(&url) {
|
||||
return Err(IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!(
|
||||
"remote auth tokens require `wss://` or loopback `ws://` URLs; got `{websocket_url}`"
|
||||
),
|
||||
));
|
||||
let initialize_params = args.initialize_params();
|
||||
match args.endpoint {
|
||||
RemoteAppServerEndpoint::WebSocket {
|
||||
websocket_url,
|
||||
auth_token,
|
||||
} => {
|
||||
let (endpoint, stream) =
|
||||
connect_websocket_endpoint(websocket_url, auth_token).await?;
|
||||
Self::connect_with_stream(channel_capacity, endpoint, stream, initialize_params)
|
||||
.await
|
||||
}
|
||||
RemoteAppServerEndpoint::UnixSocket { socket_path } => {
|
||||
let (endpoint, stream) = connect_unix_socket_endpoint(socket_path).await?;
|
||||
Self::connect_with_stream(channel_capacity, endpoint, stream, initialize_params)
|
||||
.await
|
||||
}
|
||||
}
|
||||
let mut request = url.as_str().into_client_request().map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid websocket URL `{websocket_url}`: {err}"),
|
||||
)
|
||||
})?;
|
||||
if let Some(auth_token) = args.auth_token.as_deref() {
|
||||
let header_value =
|
||||
HeaderValue::from_str(&format!("Bearer {auth_token}")).map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid remote authorization header value: {err}"),
|
||||
)
|
||||
})?;
|
||||
request.headers_mut().insert(AUTHORIZATION, header_value);
|
||||
}
|
||||
ensure_rustls_crypto_provider();
|
||||
// Remote resume responses can legitimately carry large thread histories.
|
||||
// Keep a bounded cap, but raise it above tungstenite's 16 MiB frame default.
|
||||
let websocket_config = WebSocketConfig::default()
|
||||
.max_frame_size(Some(REMOTE_APP_SERVER_MAX_WEBSOCKET_MESSAGE_SIZE))
|
||||
.max_message_size(Some(REMOTE_APP_SERVER_MAX_WEBSOCKET_MESSAGE_SIZE));
|
||||
let stream = timeout(
|
||||
CONNECT_TIMEOUT,
|
||||
connect_async_with_config(
|
||||
request,
|
||||
Some(websocket_config),
|
||||
/*disable_nagle*/ false,
|
||||
),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::TimedOut,
|
||||
format!("timed out connecting to remote app server at `{websocket_url}`"),
|
||||
)
|
||||
})?
|
||||
.map(|(stream, _response)| stream)
|
||||
.map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"failed to connect to remote app server at `{websocket_url}`: {err}"
|
||||
))
|
||||
})?;
|
||||
}
|
||||
|
||||
async fn connect_with_stream<S>(
|
||||
channel_capacity: usize,
|
||||
endpoint: String,
|
||||
stream: WebSocketStream<S>,
|
||||
initialize_params: InitializeParams,
|
||||
) -> IoResult<Self>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let mut stream = stream;
|
||||
let pending_events = initialize_remote_connection(
|
||||
&mut stream,
|
||||
&websocket_url,
|
||||
args.initialize_params(),
|
||||
&endpoint,
|
||||
initialize_params,
|
||||
INITIALIZE_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
@@ -235,13 +225,13 @@ impl RemoteAppServerClient {
|
||||
if let Err(err) = write_jsonrpc_message(
|
||||
&mut stream,
|
||||
JSONRPCMessage::Request(jsonrpc_request_from_client_request(*request)),
|
||||
&websocket_url,
|
||||
&endpoint,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let err_message = err.to_string();
|
||||
let message = format!(
|
||||
"remote app server at `{websocket_url}` write failed: {err_message}"
|
||||
"remote app server at `{endpoint}` write failed: {err_message}"
|
||||
);
|
||||
if let Some(response_tx) = pending_requests.remove(&request_id) {
|
||||
let _ = response_tx.send(Err(err));
|
||||
@@ -262,7 +252,7 @@ impl RemoteAppServerClient {
|
||||
JSONRPCMessage::Notification(
|
||||
jsonrpc_notification_from_client_notification(notification),
|
||||
),
|
||||
&websocket_url,
|
||||
&endpoint,
|
||||
)
|
||||
.await;
|
||||
let _ = response_tx.send(result);
|
||||
@@ -278,7 +268,7 @@ impl RemoteAppServerClient {
|
||||
id: request_id,
|
||||
result,
|
||||
}),
|
||||
&websocket_url,
|
||||
&endpoint,
|
||||
)
|
||||
.await;
|
||||
let _ = response_tx.send(result);
|
||||
@@ -294,16 +284,20 @@ impl RemoteAppServerClient {
|
||||
error,
|
||||
id: request_id,
|
||||
}),
|
||||
&websocket_url,
|
||||
&endpoint,
|
||||
)
|
||||
.await;
|
||||
let _ = response_tx.send(result);
|
||||
}
|
||||
RemoteClientCommand::Shutdown { response_tx } => {
|
||||
let close_result = stream.close(None).await.map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"failed to close websocket app server `{websocket_url}`: {err}"
|
||||
))
|
||||
let close_result = stream.close(None).await.or_else(|err| {
|
||||
if websocket_close_error_is_already_closed(&err) {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(IoError::other(format!(
|
||||
"failed to close websocket app server `{endpoint}`: {err}"
|
||||
)))
|
||||
}
|
||||
});
|
||||
let _ = response_tx.send(close_result);
|
||||
break;
|
||||
@@ -364,13 +358,13 @@ impl RemoteAppServerClient {
|
||||
},
|
||||
id: request_id,
|
||||
}),
|
||||
&websocket_url,
|
||||
&endpoint,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let err_message = reject_err.to_string();
|
||||
let message = format!(
|
||||
"remote app server at `{websocket_url}` write failed: {err_message}"
|
||||
"remote app server at `{endpoint}` write failed: {err_message}"
|
||||
);
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
@@ -387,7 +381,7 @@ impl RemoteAppServerClient {
|
||||
}
|
||||
Err(err) => {
|
||||
let message = format!(
|
||||
"remote app server at `{websocket_url}` sent invalid JSON-RPC: {err}"
|
||||
"remote app server at `{endpoint}` sent invalid JSON-RPC: {err}"
|
||||
);
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
@@ -408,7 +402,7 @@ impl RemoteAppServerClient {
|
||||
.filter(|reason| !reason.is_empty())
|
||||
.unwrap_or_else(|| "connection closed".to_string());
|
||||
let message = format!(
|
||||
"remote app server at `{websocket_url}` disconnected: {reason}"
|
||||
"remote app server at `{endpoint}` disconnected: {reason}"
|
||||
);
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
@@ -428,7 +422,7 @@ impl RemoteAppServerClient {
|
||||
| Some(Ok(Message::Frame(_))) => {}
|
||||
Some(Err(err)) => {
|
||||
let message = format!(
|
||||
"remote app server at `{websocket_url}` transport failed: {err}"
|
||||
"remote app server at `{endpoint}` transport failed: {err}"
|
||||
);
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
@@ -441,7 +435,7 @@ impl RemoteAppServerClient {
|
||||
}
|
||||
None => {
|
||||
let message = format!(
|
||||
"remote app server at `{websocket_url}` closed the connection"
|
||||
"remote app server at `{endpoint}` closed the connection"
|
||||
);
|
||||
let _ = deliver_event(
|
||||
&event_tx,
|
||||
@@ -678,12 +672,131 @@ impl RemoteAppServerRequestHandle {
|
||||
}
|
||||
}
|
||||
|
||||
async fn initialize_remote_connection(
|
||||
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||
websocket_url: &str,
|
||||
async fn connect_websocket_endpoint(
|
||||
websocket_url: String,
|
||||
auth_token: Option<String>,
|
||||
) -> IoResult<(String, WebSocketStream<MaybeTlsStream<TcpStream>>)> {
|
||||
let url = Url::parse(&websocket_url).map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid websocket URL `{websocket_url}`: {err}"),
|
||||
)
|
||||
})?;
|
||||
if auth_token.is_some() && !websocket_url_supports_auth_token(&url) {
|
||||
return Err(IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!(
|
||||
"remote auth tokens require `wss://` or loopback `ws://` URLs; got `{websocket_url}`"
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let mut request = url.as_str().into_client_request().map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid websocket URL `{websocket_url}`: {err}"),
|
||||
)
|
||||
})?;
|
||||
if let Some(auth_token) = auth_token.as_deref() {
|
||||
let header_value =
|
||||
HeaderValue::from_str(&format!("Bearer {auth_token}")).map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid remote authorization header value: {err}"),
|
||||
)
|
||||
})?;
|
||||
request.headers_mut().insert(AUTHORIZATION, header_value);
|
||||
}
|
||||
|
||||
ensure_rustls_crypto_provider();
|
||||
let websocket_config = remote_websocket_config();
|
||||
let stream = timeout(
|
||||
CONNECT_TIMEOUT,
|
||||
connect_async_with_config(
|
||||
request,
|
||||
Some(websocket_config),
|
||||
/*disable_nagle*/ false,
|
||||
),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::TimedOut,
|
||||
format!("timed out connecting to remote app server at `{websocket_url}`"),
|
||||
)
|
||||
})?
|
||||
.map(|(stream, _response)| stream)
|
||||
.map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"failed to connect to remote app server at `{websocket_url}`: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok((websocket_url, stream))
|
||||
}
|
||||
|
||||
async fn connect_unix_socket_endpoint(
|
||||
socket_path: AbsolutePathBuf,
|
||||
) -> IoResult<(String, WebSocketStream<UnixStream>)> {
|
||||
let endpoint = format!("unix://{}", socket_path.display());
|
||||
let request = UDS_WEBSOCKET_HANDSHAKE_URL
|
||||
.into_client_request()
|
||||
.map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::InvalidInput,
|
||||
format!("invalid UDS websocket handshake URL: {err}"),
|
||||
)
|
||||
})?;
|
||||
let stream = timeout(CONNECT_TIMEOUT, UnixStream::connect(socket_path.as_path()))
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::TimedOut,
|
||||
format!("timed out connecting to remote app server at `{endpoint}`"),
|
||||
)
|
||||
})?
|
||||
.map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"failed to connect to remote app server at `{endpoint}`: {err}"
|
||||
))
|
||||
})?;
|
||||
let websocket_config = remote_websocket_config();
|
||||
let stream = timeout(
|
||||
CONNECT_TIMEOUT,
|
||||
client_async_with_config(request, stream, Some(websocket_config)),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::TimedOut,
|
||||
format!("timed out upgrading remote app server at `{endpoint}`"),
|
||||
)
|
||||
})?
|
||||
.map(|(stream, _response)| stream)
|
||||
.map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"failed to upgrade remote app server at `{endpoint}`: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok((endpoint, stream))
|
||||
}
|
||||
|
||||
fn remote_websocket_config() -> WebSocketConfig {
|
||||
WebSocketConfig::default()
|
||||
.max_frame_size(Some(REMOTE_APP_SERVER_MAX_WEBSOCKET_MESSAGE_SIZE))
|
||||
.max_message_size(Some(REMOTE_APP_SERVER_MAX_WEBSOCKET_MESSAGE_SIZE))
|
||||
}
|
||||
|
||||
async fn initialize_remote_connection<S>(
|
||||
stream: &mut WebSocketStream<S>,
|
||||
endpoint: &str,
|
||||
params: InitializeParams,
|
||||
initialize_timeout: Duration,
|
||||
) -> IoResult<Vec<AppServerEvent>> {
|
||||
) -> IoResult<Vec<AppServerEvent>>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
let initialize_request_id = RequestId::String("initialize".to_string());
|
||||
let mut pending_events = Vec::new();
|
||||
write_jsonrpc_message(
|
||||
@@ -694,7 +807,7 @@ async fn initialize_remote_connection(
|
||||
params,
|
||||
},
|
||||
)),
|
||||
websocket_url,
|
||||
endpoint,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -704,7 +817,7 @@ async fn initialize_remote_connection(
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
let message = serde_json::from_str::<JSONRPCMessage>(&text).map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"remote app server at `{websocket_url}` sent invalid initialize response: {err}"
|
||||
"remote app server at `{endpoint}` sent invalid initialize response: {err}"
|
||||
))
|
||||
})?;
|
||||
match message {
|
||||
@@ -713,7 +826,7 @@ async fn initialize_remote_connection(
|
||||
}
|
||||
JSONRPCMessage::Error(error) if error.id == initialize_request_id => {
|
||||
break Err(IoError::other(format!(
|
||||
"remote app server at `{websocket_url}` rejected initialize: {}",
|
||||
"remote app server at `{endpoint}` rejected initialize: {}",
|
||||
error.error.message
|
||||
)));
|
||||
}
|
||||
@@ -743,7 +856,7 @@ async fn initialize_remote_connection(
|
||||
},
|
||||
id: request_id,
|
||||
}),
|
||||
websocket_url,
|
||||
endpoint,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -765,19 +878,19 @@ async fn initialize_remote_connection(
|
||||
break Err(IoError::new(
|
||||
ErrorKind::ConnectionAborted,
|
||||
format!(
|
||||
"remote app server at `{websocket_url}` closed during initialize: {reason}"
|
||||
"remote app server at `{endpoint}` closed during initialize: {reason}"
|
||||
),
|
||||
));
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
break Err(IoError::other(format!(
|
||||
"remote app server at `{websocket_url}` transport failed during initialize: {err}"
|
||||
"remote app server at `{endpoint}` transport failed during initialize: {err}"
|
||||
)));
|
||||
}
|
||||
None => {
|
||||
break Err(IoError::new(
|
||||
ErrorKind::UnexpectedEof,
|
||||
format!("remote app server at `{websocket_url}` closed during initialize"),
|
||||
format!("remote app server at `{endpoint}` closed during initialize"),
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -787,7 +900,7 @@ async fn initialize_remote_connection(
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::TimedOut,
|
||||
format!("timed out waiting for initialize response from `{websocket_url}`"),
|
||||
format!("timed out waiting for initialize response from `{endpoint}`"),
|
||||
)
|
||||
})??;
|
||||
|
||||
@@ -796,7 +909,7 @@ async fn initialize_remote_connection(
|
||||
JSONRPCMessage::Notification(jsonrpc_notification_from_client_notification(
|
||||
ClientNotification::Initialized,
|
||||
)),
|
||||
websocket_url,
|
||||
endpoint,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -850,21 +963,35 @@ fn jsonrpc_notification_from_client_notification(
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_jsonrpc_message(
|
||||
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||
async fn write_jsonrpc_message<S>(
|
||||
stream: &mut WebSocketStream<S>,
|
||||
message: JSONRPCMessage,
|
||||
websocket_url: &str,
|
||||
) -> IoResult<()> {
|
||||
endpoint: &str,
|
||||
) -> IoResult<()>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
let payload = serde_json::to_string(&message).map_err(IoError::other)?;
|
||||
stream
|
||||
.send(Message::Text(payload.into()))
|
||||
.await
|
||||
.map_err(|err| {
|
||||
IoError::other(format!(
|
||||
"failed to write websocket message to `{websocket_url}`: {err}"
|
||||
"failed to write websocket message to `{endpoint}`: {err}"
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
fn websocket_close_error_is_already_closed(err: &TungsteniteError) -> bool {
|
||||
match err {
|
||||
TungsteniteError::ConnectionClosed | TungsteniteError::AlreadyClosed => true,
|
||||
TungsteniteError::Io(err) => matches!(
|
||||
err.kind(),
|
||||
ErrorKind::BrokenPipe | ErrorKind::ConnectionReset | ErrorKind::NotConnected
|
||||
),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
+33
-26
@@ -722,9 +722,9 @@ struct FeatureToggles {
|
||||
|
||||
#[derive(Debug, Default, Parser, Clone)]
|
||||
struct InteractiveRemoteOptions {
|
||||
/// Connect the TUI to a remote app server websocket endpoint.
|
||||
/// Connect the TUI to a remote app server endpoint.
|
||||
///
|
||||
/// Accepted forms: `ws://host:port` or `wss://host:port`.
|
||||
/// Accepted forms: `ws://host:port`, `wss://host:port`, `unix://`, or `unix://PATH`.
|
||||
#[arg(long = "remote", value_name = "ADDR")]
|
||||
remote: Option<String>,
|
||||
|
||||
@@ -1709,27 +1709,39 @@ async fn run_interactive_tui(
|
||||
}
|
||||
}
|
||||
|
||||
let normalized_remote = remote
|
||||
let mut remote_endpoint = remote
|
||||
.as_deref()
|
||||
.map(codex_tui::normalize_remote_addr)
|
||||
.map(codex_tui::resolve_remote_addr)
|
||||
.transpose()
|
||||
.map_err(std::io::Error::other)?;
|
||||
if remote_auth_token_env.is_some() && normalized_remote.is_none() {
|
||||
return Ok(AppExitInfo::fatal(
|
||||
"`--remote-auth-token-env` requires `--remote`.",
|
||||
));
|
||||
if let Some(remote_auth_token_env) = remote_auth_token_env {
|
||||
let Some(endpoint) = remote_endpoint.as_mut() else {
|
||||
return Ok(AppExitInfo::fatal(
|
||||
"`--remote-auth-token-env` requires `--remote`.",
|
||||
));
|
||||
};
|
||||
if !codex_tui::remote_addr_supports_auth_token(endpoint) {
|
||||
return Ok(AppExitInfo::fatal(
|
||||
"`--remote-auth-token-env` requires a `wss://` or loopback `ws://` remote.",
|
||||
));
|
||||
}
|
||||
let auth_token = read_remote_auth_token_from_env_var(&remote_auth_token_env)
|
||||
.map_err(std::io::Error::other)?;
|
||||
let codex_tui::RemoteAppServerEndpoint::WebSocket {
|
||||
auth_token: slot, ..
|
||||
} = endpoint
|
||||
else {
|
||||
return Ok(AppExitInfo::fatal(
|
||||
"`--remote-auth-token-env` requires a `wss://` or loopback `ws://` remote.",
|
||||
));
|
||||
};
|
||||
*slot = Some(auth_token);
|
||||
}
|
||||
let remote_auth_token = remote_auth_token_env
|
||||
.as_deref()
|
||||
.map(read_remote_auth_token_from_env_var)
|
||||
.transpose()
|
||||
.map_err(std::io::Error::other)?;
|
||||
codex_tui::run_main(
|
||||
interactive,
|
||||
arg0_paths,
|
||||
codex_config::LoaderOverrides::default(),
|
||||
normalized_remote,
|
||||
remote_auth_token,
|
||||
remote_endpoint,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -2411,13 +2423,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn reject_remote_flag_for_remote_control() {
|
||||
let cli = MultitoolCli::try_parse_from([
|
||||
"codex",
|
||||
"--remote",
|
||||
"ws://127.0.0.1:1234",
|
||||
"remote-control",
|
||||
])
|
||||
.expect("parse");
|
||||
let cli = MultitoolCli::try_parse_from(["codex", "--remote", "unix://", "remote-control"])
|
||||
.expect("parse");
|
||||
assert_matches!(cli.subcommand, Some(Subcommand::RemoteControl));
|
||||
|
||||
let err = reject_remote_mode_for_subcommand(
|
||||
@@ -2432,9 +2439,9 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn remote_flag_parses_for_interactive_root() {
|
||||
let cli = MultitoolCli::try_parse_from(["codex", "--remote", "ws://127.0.0.1:4500"])
|
||||
let cli = MultitoolCli::try_parse_from(["codex", "--remote", "unix://codex.sock"])
|
||||
.expect("parse");
|
||||
assert_eq!(cli.remote.remote.as_deref(), Some("ws://127.0.0.1:4500"));
|
||||
assert_eq!(cli.remote.remote.as_deref(), Some("unix://codex.sock"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -2456,14 +2463,14 @@ mod tests {
|
||||
#[test]
|
||||
fn remote_flag_parses_for_resume_subcommand() {
|
||||
let cli =
|
||||
MultitoolCli::try_parse_from(["codex", "resume", "--remote", "ws://127.0.0.1:4500"])
|
||||
MultitoolCli::try_parse_from(["codex", "resume", "--remote", "unix://codex.sock"])
|
||||
.expect("parse");
|
||||
let Subcommand::Resume(ResumeCommand { remote, .. }) =
|
||||
cli.subcommand.expect("resume present")
|
||||
else {
|
||||
panic!("expected resume subcommand");
|
||||
};
|
||||
assert_eq!(remote.remote.as_deref(), Some("ws://127.0.0.1:4500"));
|
||||
assert_eq!(remote.remote.as_deref(), Some("unix://codex.sock"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -80,6 +80,7 @@ use crate::workspace_command::AppServerWorkspaceCommandRunner;
|
||||
use crate::workspace_command::WorkspaceCommandRunner;
|
||||
use codex_ansi_escape::ansi_escape_line;
|
||||
use codex_app_server_client::AppServerRequestHandle;
|
||||
use codex_app_server_client::RemoteAppServerEndpoint;
|
||||
use codex_app_server_client::TypedRequestError;
|
||||
use codex_app_server_protocol::AddCreditsNudgeCreditType;
|
||||
use codex_app_server_protocol::AskForApproval;
|
||||
@@ -498,8 +499,7 @@ pub(crate) struct App {
|
||||
pub(crate) feedback: codex_feedback::CodexFeedback,
|
||||
feedback_audience: FeedbackAudience,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
remote_app_server_url: Option<String>,
|
||||
remote_app_server_auth_token: Option<String>,
|
||||
remote_app_server_endpoint: Option<RemoteAppServerEndpoint>,
|
||||
/// Set when the user confirms an update; propagated on exit.
|
||||
pub(crate) pending_update_action: Option<UpdateAction>,
|
||||
|
||||
@@ -637,8 +637,7 @@ impl App {
|
||||
is_first_run: bool,
|
||||
entered_trust_nux: bool,
|
||||
should_prompt_windows_sandbox_nux_at_startup: bool,
|
||||
remote_app_server_url: Option<String>,
|
||||
remote_app_server_auth_token: Option<String>,
|
||||
remote_app_server_endpoint: Option<RemoteAppServerEndpoint>,
|
||||
state_db: Option<StateDbHandle>,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
startup_hooks_browser: Option<HooksListEntry>,
|
||||
@@ -919,8 +918,7 @@ See the Codex keymap documentation for supported actions and examples."
|
||||
feedback: feedback.clone(),
|
||||
feedback_audience,
|
||||
environment_manager,
|
||||
remote_app_server_url,
|
||||
remote_app_server_auth_token,
|
||||
remote_app_server_endpoint,
|
||||
pending_update_action: None,
|
||||
pending_shutdown_exit_thread_id: None,
|
||||
windows_sandbox: WindowsSandboxState::default(),
|
||||
|
||||
@@ -57,11 +57,8 @@ impl App {
|
||||
AppEvent::OpenResumePicker => {
|
||||
let picker_app_server = match crate::start_app_server_for_picker(
|
||||
&self.config,
|
||||
&match self.remote_app_server_url.clone() {
|
||||
Some(websocket_url) => crate::AppServerTarget::Remote {
|
||||
websocket_url,
|
||||
auth_token: self.remote_app_server_auth_token.clone(),
|
||||
},
|
||||
&match self.remote_app_server_endpoint.clone() {
|
||||
Some(endpoint) => crate::AppServerTarget::Remote { endpoint },
|
||||
None => crate::AppServerTarget::Embedded,
|
||||
},
|
||||
self.state_db.clone(),
|
||||
|
||||
@@ -633,7 +633,7 @@ impl App {
|
||||
}
|
||||
|
||||
let current_cwd = self.config.cwd.to_path_buf();
|
||||
let resume_cwd = if self.remote_app_server_url.is_some() {
|
||||
let resume_cwd = if self.remote_app_server_endpoint.is_some() {
|
||||
current_cwd.clone()
|
||||
} else {
|
||||
match crate::session_resume::resolve_cwd_for_resume_or_fork(
|
||||
|
||||
@@ -44,8 +44,7 @@ pub(super) async fn make_test_app() -> App {
|
||||
feedback: codex_feedback::CodexFeedback::new(),
|
||||
feedback_audience: FeedbackAudience::External,
|
||||
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
|
||||
remote_app_server_url: None,
|
||||
remote_app_server_auth_token: None,
|
||||
remote_app_server_endpoint: None,
|
||||
pending_update_action: None,
|
||||
pending_shutdown_exit_thread_id: None,
|
||||
windows_sandbox: WindowsSandboxState::default(),
|
||||
|
||||
@@ -3992,8 +3992,7 @@ async fn make_test_app() -> App {
|
||||
feedback: codex_feedback::CodexFeedback::new(),
|
||||
feedback_audience: FeedbackAudience::External,
|
||||
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
|
||||
remote_app_server_url: None,
|
||||
remote_app_server_auth_token: None,
|
||||
remote_app_server_endpoint: None,
|
||||
pending_update_action: None,
|
||||
pending_shutdown_exit_thread_id: None,
|
||||
windows_sandbox: WindowsSandboxState::default(),
|
||||
@@ -4055,8 +4054,7 @@ async fn make_test_app_with_channels() -> (
|
||||
feedback: codex_feedback::CodexFeedback::new(),
|
||||
feedback_audience: FeedbackAudience::External,
|
||||
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
|
||||
remote_app_server_url: None,
|
||||
remote_app_server_auth_token: None,
|
||||
remote_app_server_endpoint: None,
|
||||
pending_update_action: None,
|
||||
pending_shutdown_exit_thread_id: None,
|
||||
windows_sandbox: WindowsSandboxState::default(),
|
||||
|
||||
+189
-126
@@ -25,6 +25,7 @@ use codex_app_server_client::InProcessAppServerClient;
|
||||
use codex_app_server_client::InProcessClientStartArgs;
|
||||
use codex_app_server_client::RemoteAppServerClient;
|
||||
use codex_app_server_client::RemoteAppServerConnectArgs;
|
||||
pub use codex_app_server_client::RemoteAppServerEndpoint;
|
||||
use codex_app_server_protocol::Account as AppServerAccount;
|
||||
use codex_app_server_protocol::AskForApproval;
|
||||
use codex_app_server_protocol::AuthMode as AppServerAuthMode;
|
||||
@@ -272,6 +273,10 @@ pub use public_widgets::composer_input::ComposerAction;
|
||||
pub use public_widgets::composer_input::ComposerInput;
|
||||
// (tests access modules directly within the crate)
|
||||
|
||||
#[cfg(unix)]
|
||||
const AUTO_CONNECT_DAEMON_CONNECT_TIMEOUT: std::time::Duration =
|
||||
std::time::Duration::from_millis(50);
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn start_embedded_app_server(
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
@@ -302,10 +307,7 @@ async fn start_embedded_app_server(
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub(crate) enum AppServerTarget {
|
||||
Embedded,
|
||||
Remote {
|
||||
websocket_url: String,
|
||||
auth_token: Option<String>,
|
||||
},
|
||||
Remote { endpoint: RemoteAppServerEndpoint },
|
||||
}
|
||||
|
||||
fn remote_addr_has_explicit_port(addr: &str, parsed: &Url) -> bool {
|
||||
@@ -347,12 +349,24 @@ fn websocket_url_supports_auth_token(parsed: &Url) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn normalize_remote_addr(addr: &str) -> color_eyre::Result<String> {
|
||||
pub fn resolve_remote_addr(addr: &str) -> color_eyre::Result<RemoteAppServerEndpoint> {
|
||||
if let Some(socket_path) = addr.strip_prefix("unix://") {
|
||||
let socket_path = if socket_path.is_empty() {
|
||||
let codex_home = find_codex_home().wrap_err("failed to resolve CODEX_HOME")?;
|
||||
codex_app_server_client::app_server_control_socket_path(&codex_home)
|
||||
.map_err(color_eyre::Report::new)?
|
||||
} else {
|
||||
AbsolutePathBuf::relative_to_current_dir(socket_path)
|
||||
.map_err(color_eyre::Report::new)?
|
||||
};
|
||||
return Ok(RemoteAppServerEndpoint::UnixSocket { socket_path });
|
||||
}
|
||||
|
||||
let parsed = match Url::parse(addr) {
|
||||
Ok(parsed) => parsed,
|
||||
Err(_) => {
|
||||
color_eyre::eyre::bail!(
|
||||
"invalid remote address `{addr}`; expected `ws://host:port` or `wss://host:port`"
|
||||
"invalid remote address `{addr}`; expected `ws://host:port`, `wss://host:port`, `unix://`, or `unix://PATH`"
|
||||
);
|
||||
}
|
||||
};
|
||||
@@ -363,32 +377,31 @@ pub fn normalize_remote_addr(addr: &str) -> color_eyre::Result<String> {
|
||||
&& parsed.query().is_none()
|
||||
&& parsed.fragment().is_none()
|
||||
{
|
||||
return Ok(parsed.to_string());
|
||||
return Ok(RemoteAppServerEndpoint::WebSocket {
|
||||
websocket_url: parsed.to_string(),
|
||||
auth_token: None,
|
||||
});
|
||||
}
|
||||
|
||||
color_eyre::eyre::bail!(
|
||||
"invalid remote address `{addr}`; expected `ws://host:port` or `wss://host:port`"
|
||||
"invalid remote address `{addr}`; expected `ws://host:port`, `wss://host:port`, `unix://`, or `unix://PATH`"
|
||||
);
|
||||
}
|
||||
|
||||
fn validate_remote_auth_token_transport(websocket_url: &str) -> color_eyre::Result<()> {
|
||||
let parsed = Url::parse(websocket_url).map_err(color_eyre::Report::new)?;
|
||||
if websocket_url_supports_auth_token(&parsed) {
|
||||
return Ok(());
|
||||
pub fn remote_addr_supports_auth_token(endpoint: &RemoteAppServerEndpoint) -> bool {
|
||||
match endpoint {
|
||||
RemoteAppServerEndpoint::WebSocket { websocket_url, .. } => {
|
||||
Url::parse(websocket_url).is_ok_and(|parsed| websocket_url_supports_auth_token(&parsed))
|
||||
}
|
||||
RemoteAppServerEndpoint::UnixSocket { .. } => false,
|
||||
}
|
||||
|
||||
color_eyre::eyre::bail!(
|
||||
"remote auth tokens require `wss://` or loopback `ws://` URLs; got `{websocket_url}`"
|
||||
)
|
||||
}
|
||||
|
||||
async fn connect_remote_app_server(
|
||||
websocket_url: String,
|
||||
auth_token: Option<String>,
|
||||
endpoint: RemoteAppServerEndpoint,
|
||||
) -> color_eyre::Result<AppServerClient> {
|
||||
let app_server = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
||||
websocket_url,
|
||||
auth_token,
|
||||
endpoint,
|
||||
client_name: "codex-tui".to_string(),
|
||||
client_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
experimental_api: true,
|
||||
@@ -400,6 +413,40 @@ async fn connect_remote_app_server(
|
||||
Ok(AppServerClient::Remote(app_server))
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn maybe_probe_default_daemon_socket(codex_home: &Path) -> Option<AbsolutePathBuf> {
|
||||
let socket_path = codex_app_server_client::app_server_control_socket_path(codex_home).ok()?;
|
||||
if !socket_path.as_path().try_exists().unwrap_or(false) {
|
||||
return None;
|
||||
}
|
||||
|
||||
match tokio::time::timeout(
|
||||
AUTO_CONNECT_DAEMON_CONNECT_TIMEOUT,
|
||||
tokio::net::UnixStream::connect(socket_path.as_path()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(_stream)) => Some(socket_path),
|
||||
Ok(Err(err)) => {
|
||||
tracing::debug!(%err, socket_path = %socket_path.display(), "skipping default app-server daemon socket");
|
||||
None
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::debug!(
|
||||
socket_path = %socket_path.display(),
|
||||
timeout_ms = AUTO_CONNECT_DAEMON_CONNECT_TIMEOUT.as_millis(),
|
||||
"timed out probing default app-server daemon socket"
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
async fn maybe_probe_default_daemon_socket(_codex_home: &Path) -> Option<AbsolutePathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn start_app_server(
|
||||
target: &AppServerTarget,
|
||||
@@ -427,10 +474,7 @@ async fn start_app_server(
|
||||
)
|
||||
.await
|
||||
.map(AppServerClient::InProcess),
|
||||
AppServerTarget::Remote {
|
||||
websocket_url,
|
||||
auth_token,
|
||||
} => connect_remote_app_server(websocket_url.clone(), auth_token.clone()).await,
|
||||
AppServerTarget::Remote { endpoint } => connect_remote_app_server(endpoint.clone()).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -714,24 +758,8 @@ pub async fn run_main(
|
||||
mut cli: Cli,
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
loader_overrides: LoaderOverrides,
|
||||
remote: Option<String>,
|
||||
remote_auth_token: Option<String>,
|
||||
explicit_remote_endpoint: Option<RemoteAppServerEndpoint>,
|
||||
) -> std::io::Result<AppExitInfo> {
|
||||
let remote_url = remote;
|
||||
if let (Some(websocket_url), Some(_)) = (remote_url.as_deref(), remote_auth_token.as_ref()) {
|
||||
validate_remote_auth_token_transport(websocket_url).map_err(std::io::Error::other)?;
|
||||
}
|
||||
let app_server_target = remote_url
|
||||
.clone()
|
||||
.map(|websocket_url| AppServerTarget::Remote {
|
||||
websocket_url,
|
||||
auth_token: remote_auth_token.clone(),
|
||||
})
|
||||
.unwrap_or(AppServerTarget::Embedded);
|
||||
let remote_cwd_override = cli
|
||||
.cwd
|
||||
.clone()
|
||||
.filter(|_| matches!(app_server_target, AppServerTarget::Remote { .. }));
|
||||
let (sandbox_mode, approval_policy) = if cli.dangerously_bypass_approvals_and_sandbox {
|
||||
(
|
||||
Some(SandboxMode::DangerFullAccess),
|
||||
@@ -776,6 +804,22 @@ pub async fn run_main(
|
||||
}
|
||||
};
|
||||
|
||||
let remote_endpoint = match explicit_remote_endpoint {
|
||||
Some(endpoint) => Some(endpoint),
|
||||
None => maybe_probe_default_daemon_socket(&codex_home)
|
||||
.await
|
||||
.map(|socket_path| RemoteAppServerEndpoint::UnixSocket { socket_path }),
|
||||
};
|
||||
let app_server_target = remote_endpoint
|
||||
.clone()
|
||||
.map_or(AppServerTarget::Embedded, |endpoint| {
|
||||
AppServerTarget::Remote { endpoint }
|
||||
});
|
||||
let remote_cwd_override = cli
|
||||
.cwd
|
||||
.clone()
|
||||
.filter(|_| matches!(app_server_target, AppServerTarget::Remote { .. }));
|
||||
|
||||
let local_runtime_paths = ExecServerRuntimePaths::from_optional_paths(
|
||||
arg0_paths.codex_self_exe.clone(),
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
@@ -1098,8 +1142,7 @@ pub async fn run_main(
|
||||
feedback,
|
||||
log_db,
|
||||
state_db,
|
||||
remote_url,
|
||||
remote_auth_token,
|
||||
remote_endpoint,
|
||||
environment_manager,
|
||||
)
|
||||
.await
|
||||
@@ -1120,8 +1163,7 @@ async fn run_ratatui_app(
|
||||
feedback: codex_feedback::CodexFeedback,
|
||||
log_db: Option<log_db::LogDbLayer>,
|
||||
state_db: Option<StateDbHandle>,
|
||||
remote_url: Option<String>,
|
||||
remote_auth_token: Option<String>,
|
||||
remote_endpoint: Option<RemoteAppServerEndpoint>,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
) -> color_eyre::Result<AppExitInfo> {
|
||||
let remote_mode = matches!(&app_server_target, AppServerTarget::Remote { .. });
|
||||
@@ -1169,30 +1211,29 @@ async fn run_ratatui_app(
|
||||
// Initialize high-fidelity session event logging if enabled.
|
||||
session_log::maybe_init(&initial_config);
|
||||
|
||||
let mut app_server = Some(
|
||||
match start_app_server(
|
||||
&app_server_target,
|
||||
arg0_paths.clone(),
|
||||
initial_config.clone(),
|
||||
cli_kv_overrides.clone(),
|
||||
loader_overrides.clone(),
|
||||
cloud_requirements.clone(),
|
||||
feedback.clone(),
|
||||
log_db.clone(),
|
||||
state_db.clone(),
|
||||
environment_manager.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(app_server) => AppServerSession::new(app_server)
|
||||
.with_remote_cwd_override(remote_cwd_override.clone()),
|
||||
Err(err) => {
|
||||
terminal_restore_guard.restore_silently();
|
||||
session_log::log_session_end();
|
||||
return Err(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
let app_server_session = match start_app_server(
|
||||
&app_server_target,
|
||||
arg0_paths.clone(),
|
||||
initial_config.clone(),
|
||||
cli_kv_overrides.clone(),
|
||||
loader_overrides.clone(),
|
||||
cloud_requirements.clone(),
|
||||
feedback.clone(),
|
||||
log_db.clone(),
|
||||
state_db.clone(),
|
||||
environment_manager.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(app_server) => AppServerSession::new(app_server),
|
||||
Err(err) => {
|
||||
terminal_restore_guard.restore_silently();
|
||||
session_log::log_session_end();
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
.with_remote_cwd_override(remote_cwd_override.clone());
|
||||
let mut app_server = Some(app_server_session);
|
||||
|
||||
let should_show_trust_screen_flag = !remote_mode && should_show_trust_screen(&initial_config);
|
||||
let mut trust_decision_was_made = false;
|
||||
@@ -1553,8 +1594,7 @@ async fn run_ratatui_app(
|
||||
should_show_trust_screen, // Proxy to: is it a first run in this directory?
|
||||
should_show_trust_screen_flag, // Preserve the startup-time trust NUX signal before onboarding
|
||||
should_prompt_windows_sandbox_nux_at_startup,
|
||||
remote_url,
|
||||
remote_auth_token,
|
||||
remote_endpoint,
|
||||
state_db,
|
||||
environment_manager,
|
||||
startup_hooks_browser,
|
||||
@@ -1788,81 +1828,103 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_remote_addr_accepts_websocket_url() {
|
||||
fn resolve_remote_addr_accepts_websocket_url() {
|
||||
assert_eq!(
|
||||
normalize_remote_addr("ws://127.0.0.1:4500").expect("ws URL should normalize"),
|
||||
"ws://127.0.0.1:4500/"
|
||||
resolve_remote_addr("ws://127.0.0.1:4500").expect("ws URL should normalize"),
|
||||
RemoteAppServerEndpoint::WebSocket {
|
||||
websocket_url: "ws://127.0.0.1:4500/".to_string(),
|
||||
auth_token: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_remote_addr_accepts_secure_websocket_url() {
|
||||
fn resolve_remote_addr_accepts_secure_websocket_url() {
|
||||
assert_eq!(
|
||||
normalize_remote_addr("wss://example.com:443").expect("wss URL should normalize"),
|
||||
"wss://example.com/"
|
||||
resolve_remote_addr("wss://example.com:443").expect("wss URL should normalize"),
|
||||
RemoteAppServerEndpoint::WebSocket {
|
||||
websocket_url: "wss://example.com/".to_string(),
|
||||
auth_token: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_remote_addr_rejects_websocket_url_without_explicit_port() {
|
||||
fn resolve_remote_addr_accepts_default_socket() -> color_eyre::Result<()> {
|
||||
let codex_home = find_codex_home().wrap_err("failed to resolve CODEX_HOME")?;
|
||||
assert_eq!(
|
||||
resolve_remote_addr("unix://")?,
|
||||
RemoteAppServerEndpoint::UnixSocket {
|
||||
socket_path: codex_app_server_client::app_server_control_socket_path(&codex_home)?,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_remote_addr_accepts_relative_socket_path() -> color_eyre::Result<()> {
|
||||
assert_eq!(
|
||||
resolve_remote_addr("unix://codex.sock")?,
|
||||
RemoteAppServerEndpoint::UnixSocket {
|
||||
socket_path: AbsolutePathBuf::relative_to_current_dir("codex.sock")?,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_remote_addr_accepts_absolute_socket_path() -> color_eyre::Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let socket_path = temp_dir.path().join("codex.sock");
|
||||
assert_eq!(
|
||||
resolve_remote_addr(&format!("unix://{}", socket_path.display()))?,
|
||||
RemoteAppServerEndpoint::UnixSocket {
|
||||
socket_path: AbsolutePathBuf::from_absolute_path(&socket_path)?,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_remote_addr_rejects_invalid_remote_addresses() {
|
||||
for addr in [
|
||||
"ws://127.0.0.1",
|
||||
"wss://example.com",
|
||||
"ws://user:pass@127.0.0.1",
|
||||
"127.0.0.1:4500",
|
||||
"https://127.0.0.1:4500",
|
||||
] {
|
||||
let err = normalize_remote_addr(addr)
|
||||
.expect_err("websocket URLs without an explicit port should be rejected");
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("expected `ws://host:port` or `wss://host:port`")
|
||||
);
|
||||
let err = resolve_remote_addr(addr).expect_err("invalid remote addresses should fail");
|
||||
assert!(err.to_string().contains(
|
||||
"expected `ws://host:port`, `wss://host:port`, `unix://`, or `unix://PATH`"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_remote_addr_rejects_invalid_input() {
|
||||
let err = normalize_remote_addr("https://127.0.0.1:4500")
|
||||
.expect_err("https URLs should be rejected");
|
||||
#[tokio::test]
|
||||
async fn default_daemon_auto_connect_skips_missing_socket() -> color_eyre::Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("expected `ws://host:port` or `wss://host:port`")
|
||||
maybe_probe_default_daemon_socket(codex_home.path())
|
||||
.await
|
||||
.is_none()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_remote_addr_rejects_host_port_shortcut() {
|
||||
let err =
|
||||
normalize_remote_addr("127.0.0.1:4500").expect_err("host:port should be rejected");
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("expected `ws://host:port` or `wss://host:port`")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_auth_token_transport_accepts_loopback_ws() {
|
||||
validate_remote_auth_token_transport("ws://127.0.0.1:4500/")
|
||||
.expect("loopback ws should be allowed for auth tokens");
|
||||
validate_remote_auth_token_transport("ws://localhost:4500/")
|
||||
.expect("localhost ws should be allowed for auth tokens");
|
||||
validate_remote_auth_token_transport("ws://[::1]:4500/")
|
||||
.expect("ipv6 loopback ws should be allowed for auth tokens");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_auth_token_transport_accepts_secure_wss() {
|
||||
validate_remote_auth_token_transport("wss://example.com:443/")
|
||||
.expect("wss should be allowed for auth tokens");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_auth_token_transport_rejects_non_loopback_ws() {
|
||||
let err = validate_remote_auth_token_transport("ws://example.com:4500/")
|
||||
.expect_err("non-loopback ws should be rejected for auth tokens");
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("remote auth tokens require `wss://` or loopback `ws://` URLs")
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn default_daemon_auto_connect_probes_socket_only() -> color_eyre::Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let socket_path =
|
||||
codex_app_server_client::app_server_control_socket_path(codex_home.path())?;
|
||||
std::fs::create_dir_all(socket_path.as_path().parent().expect("socket parent"))?;
|
||||
let _listener = tokio::net::UnixListener::bind(socket_path.as_path())?;
|
||||
|
||||
assert_eq!(
|
||||
maybe_probe_default_daemon_socket(codex_home.path()).await,
|
||||
Some(socket_path)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -2109,8 +2171,9 @@ mod tests {
|
||||
Path::new("/definitely/not/local/to/this/test")
|
||||
};
|
||||
let target = AppServerTarget::Remote {
|
||||
websocket_url: "ws://127.0.0.1:1234/".to_string(),
|
||||
auth_token: None,
|
||||
endpoint: RemoteAppServerEndpoint::UnixSocket {
|
||||
socket_path: AbsolutePathBuf::relative_to_current_dir("codex.sock")?,
|
||||
},
|
||||
};
|
||||
let environment_manager = EnvironmentManager::default_for_tests();
|
||||
|
||||
|
||||
@@ -57,8 +57,7 @@ fn main() -> anyhow::Result<()> {
|
||||
inner,
|
||||
arg0_paths,
|
||||
LoaderOverrides::default(),
|
||||
/*remote*/ None,
|
||||
/*remote_auth_token*/ None,
|
||||
/*explicit_remote_endpoint*/ None,
|
||||
)
|
||||
.await?;
|
||||
match exit_info.exit_reason {
|
||||
|
||||
Reference in New Issue
Block a user