TUI: Keep remote app-server events draining (#18932)

Addresses #18860

Problem: Remote app-server clients could stop draining websocket events
when their bounded local event channel filled, leaving clients stuck on
stale in-progress turns after a disconnect.

Solution: Use an unbounded local event channel for the remote client so
the websocket reader can keep forwarding disconnect and progress events
instead of blocking or dropping them.

Why this is reasonable: This does not make the remote websocket itself
unbounded. The changed queue lives inside the remote client, between the
task that reads the remote websocket and the API consumer in the same
client process. Once an event has been received from the remote server,
preserving it is preferable to blocking websocket reads or dropping
disconnect/lifecycle events; network-level backpressure still happens at
the websocket boundary if the remote side outpaces the client.
This commit is contained in:
Eric Traut
2026-04-22 09:29:34 -07:00
committed by GitHub
Unverified
parent 0127cef5db
commit 79ea577156
+15 -158
View File
@@ -20,7 +20,6 @@ use crate::RequestResult;
use crate::SHUTDOWN_TIMEOUT;
use crate::TypedRequestError;
use crate::request_method_name;
use crate::server_notification_requires_delivery;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
@@ -126,7 +125,7 @@ enum RemoteClientCommand {
pub struct RemoteAppServerClient {
command_tx: mpsc::Sender<RemoteClientCommand>,
event_rx: mpsc::Receiver<AppServerEvent>,
event_rx: mpsc::UnboundedReceiver<AppServerEvent>,
pending_events: VecDeque<AppServerEvent>,
worker_handle: tokio::task::JoinHandle<()>,
}
@@ -195,11 +194,10 @@ impl RemoteAppServerClient {
.await?;
let (command_tx, mut command_rx) = mpsc::channel::<RemoteClientCommand>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<AppServerEvent>(channel_capacity);
let (event_tx, event_rx) = mpsc::unbounded_channel::<AppServerEvent>();
let worker_handle = tokio::spawn(async move {
let mut pending_requests =
HashMap::<RequestId, oneshot::Sender<IoResult<RequestResult>>>::new();
let mut skipped_events = 0usize;
loop {
tokio::select! {
command = command_rx.recv() => {
@@ -231,15 +229,12 @@ impl RemoteAppServerClient {
}
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` write failed: {err_message}"
),
},
&mut stream,
)
.await;
);
break;
}
}
@@ -316,11 +311,8 @@ impl RemoteAppServerClient {
app_server_event_from_notification(notification)
&& let Err(err) = deliver_event(
&event_tx,
&mut skipped_events,
event,
&mut stream,
)
.await
{
warn!(%err, "failed to deliver remote app-server event");
break;
@@ -333,11 +325,8 @@ impl RemoteAppServerClient {
Ok(request) => {
if let Err(err) = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::ServerRequest(request),
&mut stream,
)
.await
{
warn!(%err, "failed to deliver remote app-server server request");
break;
@@ -364,15 +353,12 @@ impl RemoteAppServerClient {
let err_message = reject_err.to_string();
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` write failed: {err_message}"
),
},
&mut stream,
)
.await;
);
break;
}
}
@@ -381,15 +367,12 @@ impl RemoteAppServerClient {
Err(err) => {
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` sent invalid JSON-RPC: {err}"
),
},
&mut stream,
)
.await;
);
break;
}
}
@@ -402,15 +385,12 @@ impl RemoteAppServerClient {
.unwrap_or_else(|| "connection closed".to_string());
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` disconnected: {reason}"
),
},
&mut stream,
)
.await;
);
break;
}
Some(Ok(Message::Binary(_)))
@@ -420,29 +400,23 @@ impl RemoteAppServerClient {
Some(Err(err)) => {
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` transport failed: {err}"
),
},
&mut stream,
)
.await;
);
break;
}
None => {
let _ = deliver_event(
&event_tx,
&mut skipped_events,
AppServerEvent::Disconnected {
message: format!(
"remote app server at `{websocket_url}` closed the connection"
),
},
&mut stream,
)
.await;
);
break;
}
}
@@ -801,100 +775,16 @@ fn app_server_event_from_notification(notification: JSONRPCNotification) -> Opti
}
}
async fn deliver_event(
event_tx: &mpsc::Sender<AppServerEvent>,
skipped_events: &mut usize,
fn deliver_event(
event_tx: &mpsc::UnboundedSender<AppServerEvent>,
event: AppServerEvent,
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
) -> IoResult<()> {
if *skipped_events > 0 {
if event_requires_delivery(&event) {
if event_tx
.send(AppServerEvent::Lagged {
skipped: *skipped_events,
})
.await
.is_err()
{
return Err(IoError::new(
ErrorKind::BrokenPipe,
"remote app-server event consumer channel is closed",
));
}
*skipped_events = 0;
} else {
match event_tx.try_send(AppServerEvent::Lagged {
skipped: *skipped_events,
}) {
Ok(()) => *skipped_events = 0,
Err(mpsc::error::TrySendError::Full(_)) => {
*skipped_events = (*skipped_events).saturating_add(1);
reject_if_server_request_dropped(stream, &event).await?;
return Ok(());
}
Err(mpsc::error::TrySendError::Closed(_)) => {
return Err(IoError::new(
ErrorKind::BrokenPipe,
"remote app-server event consumer channel is closed",
));
}
}
}
}
if event_requires_delivery(&event) {
event_tx.send(event).await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server event consumer channel is closed",
)
})?;
return Ok(());
}
match event_tx.try_send(event) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(event)) => {
*skipped_events = (*skipped_events).saturating_add(1);
reject_if_server_request_dropped(stream, &event).await
}
Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new(
event_tx.send(event).map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server event consumer channel is closed",
)),
}
}
async fn reject_if_server_request_dropped(
stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
event: &AppServerEvent,
) -> IoResult<()> {
let AppServerEvent::ServerRequest(request) = event else {
return Ok(());
};
write_jsonrpc_message(
stream,
JSONRPCMessage::Error(JSONRPCError {
error: JSONRPCErrorError {
code: -32001,
message: "remote app-server event queue is full".to_string(),
data: None,
},
id: request.id().clone(),
}),
"<remote-app-server>",
)
.await
}
fn event_requires_delivery(event: &AppServerEvent) -> bool {
match event {
AppServerEvent::ServerNotification(notification) => {
server_notification_requires_delivery(notification)
}
AppServerEvent::Disconnected { .. } => true,
AppServerEvent::Lagged { .. } | AppServerEvent::ServerRequest(_) => false,
}
)
})
}
fn request_id_from_client_request(request: &ClientRequest) -> RequestId {
@@ -940,47 +830,14 @@ async fn write_jsonrpc_message(
))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn event_requires_delivery_marks_transcript_and_disconnect_events() {
assert!(event_requires_delivery(
&AppServerEvent::ServerNotification(ServerNotification::AgentMessageDelta(
codex_app_server_protocol::AgentMessageDeltaNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item_id: "item".to_string(),
delta: "hello".to_string(),
},
),)
));
assert!(event_requires_delivery(
&AppServerEvent::ServerNotification(ServerNotification::ItemCompleted(
codex_app_server_protocol::ItemCompletedNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
item: codex_app_server_protocol::ThreadItem::Plan {
id: "item".to_string(),
text: "step".to_string(),
},
}
),)
));
assert!(event_requires_delivery(&AppServerEvent::Disconnected {
message: "closed".to_string(),
}));
assert!(!event_requires_delivery(&AppServerEvent::Lagged {
skipped: 1
}));
}
#[tokio::test]
async fn shutdown_tolerates_worker_exit_after_command_is_queued() {
let (command_tx, mut command_rx) = mpsc::channel(1);
let (_event_tx, event_rx) = mpsc::channel(1);
let (_event_tx, event_rx) = mpsc::unbounded_channel::<AppServerEvent>();
let worker_handle = tokio::spawn(async move {
let _ = command_rx.recv().await;
});