From 38e648ca67802d5f2fb23f9b3bd3f200cdb067fa Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Sun, 29 Mar 2026 12:19:34 -0600 Subject: [PATCH] Fix tui_app_server ghost subagent entries in /agent (#16110) Fixes #16092 The app-server-backed TUI could accumulate ghost subagent entries in `/agent` after resume/backfill flows. Some of those rows were no longer live according to the backend, but still appeared selectable in the picker and could open as blank threads. *Cause* Unlike the legacy tui behavior, tui_app_server was creating local picker/replay state for subagents discovered through metadata refresh and loaded-thread backfill, even when no real local session or transcript had been attached. That let stale ids survive in the picker as if they were replayable threads. *Fix* Stop creating empty local thread channels during subagent metadata hydration and loaded-thread backfill. When opening /agent, prune metadata-only entries that thread/read reports as terminally unavailable. When selecting a discovered subagent that is still live but not yet locally attached, materialize a real local session on demand from thread/read instead of falling back to an empty replay state. --- codex-rs/tui/src/app.rs | 428 ++++++++++++++++++++--- codex-rs/tui/src/app/agent_navigation.rs | 18 + 2 files changed, 395 insertions(+), 51 deletions(-) diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 3945f32f5..5ea10e70e 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -2353,7 +2353,6 @@ impl App { .await { Ok(thread) => { - self.ensure_thread_channel(thread_id); self.upsert_agent_picker_thread( thread_id, thread.agent_nickname, @@ -2627,51 +2626,18 @@ impl App { /// historical id now" and converted into closed picker entries instead of deleting them, so /// the stable traversal order remains intact for review and keyboard navigation. async fn open_agent_picker(&mut self, app_server: &mut AppServerSession) { - let thread_ids: Vec = self.thread_event_channels.keys().cloned().collect(); + let mut thread_ids = self.agent_navigation.tracked_thread_ids(); + for thread_id in self.thread_event_channels.keys().copied() { + if !thread_ids.contains(&thread_id) { + thread_ids.push(thread_id); + } + } for thread_id in thread_ids { - let existing_entry = self.agent_navigation.get(&thread_id).cloned(); - match app_server - .thread_read(thread_id, /*include_turns*/ false) + if !self + .refresh_agent_picker_thread_liveness(app_server, thread_id) .await { - Ok(thread) => { - self.upsert_agent_picker_thread( - thread_id, - thread.agent_nickname.or_else(|| { - existing_entry - .as_ref() - .and_then(|entry| entry.agent_nickname.clone()) - }), - thread.agent_role.or_else(|| { - existing_entry - .as_ref() - .and_then(|entry| entry.agent_role.clone()) - }), - matches!( - thread.status, - codex_app_server_protocol::ThreadStatus::NotLoaded - ), - ); - } - Err(err) => { - let is_closed = Self::closed_state_for_thread_read_error( - &err, - existing_entry.as_ref().map(|entry| entry.is_closed), - ); - if let Some(entry) = existing_entry { - self.upsert_agent_picker_thread( - thread_id, - entry.agent_nickname, - entry.agent_role, - is_closed, - ); - } else { - self.upsert_agent_picker_thread( - thread_id, /*agent_nickname*/ None, /*agent_role*/ None, - is_closed, - ); - } - } + continue; } } @@ -2744,6 +2710,14 @@ impl App { Self::is_terminal_thread_read_error(err) || existing_is_closed.unwrap_or(false) } + fn can_fallback_from_include_turns_error(err: &color_eyre::Report) -> bool { + err.chain().any(|cause| { + let message = cause.to_string(); + message.contains("includeTurns is unavailable before first user message") + || message.contains("ephemeral threads do not support includeTurns") + }) + } + /// Updates cached picker metadata and then mirrors any visible-label change into the footer. /// /// These two writes stay paired so the picker rows and contextual footer continue to describe @@ -2774,6 +2748,168 @@ impl App { self.sync_active_agent_label(); } + async fn refresh_agent_picker_thread_liveness( + &mut self, + app_server: &mut AppServerSession, + thread_id: ThreadId, + ) -> bool { + let existing_entry = self.agent_navigation.get(&thread_id).cloned(); + let has_replay_channel = self.thread_event_channels.contains_key(&thread_id); + match app_server + .thread_read(thread_id, /*include_turns*/ false) + .await + { + Ok(thread) => { + self.upsert_agent_picker_thread( + thread_id, + thread.agent_nickname.or_else(|| { + existing_entry + .as_ref() + .and_then(|entry| entry.agent_nickname.clone()) + }), + thread.agent_role.or_else(|| { + existing_entry + .as_ref() + .and_then(|entry| entry.agent_role.clone()) + }), + matches!( + thread.status, + codex_app_server_protocol::ThreadStatus::NotLoaded + ), + ); + true + } + Err(err) => { + if Self::is_terminal_thread_read_error(&err) && !has_replay_channel { + self.agent_navigation.remove(thread_id); + return false; + } + let is_closed = Self::closed_state_for_thread_read_error( + &err, + existing_entry.as_ref().map(|entry| entry.is_closed), + ); + if let Some(entry) = existing_entry { + self.upsert_agent_picker_thread( + thread_id, + entry.agent_nickname, + entry.agent_role, + is_closed, + ); + } else { + self.upsert_agent_picker_thread( + thread_id, /*agent_nickname*/ None, /*agent_role*/ None, + is_closed, + ); + } + true + } + } + } + + async fn session_state_for_thread_read( + &self, + thread_id: ThreadId, + thread: &codex_app_server_protocol::Thread, + ) -> ThreadSessionState { + let mut session = self + .primary_session_configured + .clone() + .unwrap_or(ThreadSessionState { + thread_id, + forked_from_id: None, + thread_name: None, + model: self.chat_widget.current_model().to_string(), + model_provider_id: self.config.model_provider_id.clone(), + service_tier: self.chat_widget.current_service_tier(), + approval_policy: self.config.permissions.approval_policy.value(), + approvals_reviewer: self.config.approvals_reviewer, + sandbox_policy: self.config.permissions.sandbox_policy.get().clone(), + cwd: thread.cwd.clone(), + reasoning_effort: self.chat_widget.current_reasoning_effort(), + history_log_id: 0, + history_entry_count: 0, + network_proxy: None, + rollout_path: thread.path.clone(), + }); + session.thread_id = thread_id; + session.thread_name = thread.name.clone(); + session.model_provider_id = thread.model_provider.clone(); + session.cwd = thread.cwd.clone(); + session.rollout_path = thread.path.clone(); + if let Some(model) = + read_session_model(&self.config, thread_id, thread.path.as_deref()).await + { + session.model = model; + } else if thread.path.is_some() { + session.model.clear(); + } + session.history_log_id = 0; + session.history_entry_count = 0; + session + } + + /// Materializes a live thread into local replay state when the picker knows about it but the + /// TUI has not cached a local event channel yet. + /// + /// Resume-time backfill intentionally avoids creating empty placeholder channels, because those + /// placeholders make stale `/agent` entries open blank transcripts. When a user later selects a + /// still-live discovered thread, attach it on demand with a real resumed snapshot. + async fn attach_live_thread_for_selection( + &mut self, + app_server: &mut AppServerSession, + thread_id: ThreadId, + ) -> Result { + if self.thread_event_channels.contains_key(&thread_id) { + return Ok(true); + } + + let (session, turns, live_attached) = match app_server + .resume_thread(self.config.clone(), thread_id) + .await + { + Ok(started) => (started.session, started.turns, true), + Err(resume_err) => { + tracing::warn!( + thread_id = %thread_id, + error = %resume_err, + "failed to resume live thread for selection; falling back to thread/read" + ); + let (thread, turns) = match app_server + .thread_read(thread_id, /*include_turns*/ true) + .await + { + Ok(thread) => { + let turns = thread.turns.clone(); + (thread, turns) + } + Err(err) if Self::can_fallback_from_include_turns_error(&err) => { + let thread = app_server + .thread_read(thread_id, /*include_turns*/ false) + .await?; + (thread, Vec::new()) + } + Err(err) => return Err(err), + }; + if turns.is_empty() { + // A `thread/read` fallback without turns would create a blank local replay + // channel with no live listener attached, which blocks later real re-attach. + return Err(color_eyre::eyre::eyre!( + "Agent thread {thread_id} is not yet available for replay or live attach." + )); + } + let mut session = self.session_state_for_thread_read(thread_id, &thread).await; + // `thread/read` can seed replay state, but it does not attach the app-server + // listener that `thread/resume` establishes, so treat this path as replay-only. + session.model.clear(); + (session, turns, false) + } + }; + let channel = self.ensure_thread_channel(thread_id); + let mut store = channel.store.lock().await; + store.set_session(session, turns); + Ok(live_attached) + } + /// Replaces the chat widget and re-seeds the new widget's collab metadata from the navigation /// cache. /// @@ -2810,15 +2946,43 @@ impl App { return Ok(()); } - if !self.thread_event_channels.contains_key(&thread_id) { + if !self + .refresh_agent_picker_thread_liveness(app_server, thread_id) + .await + { self.chat_widget - .add_error_message(format!("Failed to attach to agent thread {thread_id}.")); + .add_error_message(format!("Agent thread {thread_id} is no longer available.")); return Ok(()); } - let is_replay_only = self + + let mut is_replay_only = self .agent_navigation .get(&thread_id) .is_some_and(|entry| entry.is_closed); + let mut attached_replay_only = false; + if self.should_attach_live_thread_for_selection(thread_id) { + match self + .attach_live_thread_for_selection(app_server, thread_id) + .await + { + Ok(live_attached) => { + attached_replay_only = !live_attached; + if attached_replay_only { + is_replay_only = true; + } + } + Err(err) => { + self.chat_widget.add_error_message(format!( + "Failed to attach to agent thread {thread_id}: {err}" + )); + return Ok(()); + } + } + } else if !self.thread_event_channels.contains_key(&thread_id) && is_replay_only { + self.chat_widget + .add_error_message(format!("Agent thread {thread_id} is no longer available.")); + return Ok(()); + } let previous_thread_id = self.active_thread_id; self.store_active_thread_receiver().await; @@ -2850,10 +3014,14 @@ impl App { self.reset_for_thread_switch(tui)?; self.replay_thread_snapshot(snapshot, !is_replay_only); if is_replay_only { - self.chat_widget.add_info_message( - format!("Agent thread {thread_id} is closed. Replaying saved transcript."), - /*hint*/ None, - ); + let message = if attached_replay_only { + format!( + "Agent thread {thread_id} could not be resumed live. Replaying saved transcript." + ) + } else { + format!("Agent thread {thread_id} is closed. Replaying saved transcript.") + }; + self.chat_widget.add_info_message(message, /*hint*/ None); } self.drain_active_thread_events(tui).await?; self.refresh_pending_thread_approvals().await; @@ -2861,6 +3029,14 @@ impl App { Ok(()) } + fn should_attach_live_thread_for_selection(&self, thread_id: ThreadId) -> bool { + !self.thread_event_channels.contains_key(&thread_id) + && self + .agent_navigation + .get(&thread_id) + .is_none_or(|entry| !entry.is_closed) + } + fn reset_for_thread_switch(&mut self, tui: &mut tui::Tui) -> Result<()> { self.overlay = None; self.transcript_cells.clear(); @@ -3014,7 +3190,6 @@ impl App { } for thread in find_loaded_subagent_threads_for_primary(threads, primary_thread_id) { - self.ensure_thread_channel(thread.thread_id); self.upsert_agent_picker_thread( thread.thread_id, thread.agent_nickname, @@ -6971,6 +7146,28 @@ mod tests { Ok(()) } + #[tokio::test] + async fn open_agent_picker_prunes_terminal_metadata_only_threads() -> Result<()> { + let mut app = make_test_app().await; + let mut app_server = + crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) + .await + .expect("embedded app server"); + let thread_id = ThreadId::new(); + app.agent_navigation.upsert( + thread_id, + Some("Ghost".to_string()), + Some("worker".to_string()), + /*is_closed*/ false, + ); + + app.open_agent_picker(&mut app_server).await; + + assert_eq!(app.agent_navigation.get(&thread_id), None); + assert!(app.agent_navigation.is_empty()); + Ok(()) + } + #[tokio::test] async fn open_agent_picker_marks_terminal_read_errors_closed() -> Result<()> { let mut app = make_test_app().await; @@ -7041,6 +7238,19 @@ mod tests { )); } + #[test] + fn include_turns_fallback_detection_handles_unmaterialized_and_ephemeral_threads() { + let unmaterialized = color_eyre::eyre::eyre!( + "thread/read failed during TUI session lookup: thread/read failed: thread thr_123 is not materialized yet; includeTurns is unavailable before first user message" + ); + let ephemeral = color_eyre::eyre::eyre!( + "thread/read failed during TUI session lookup: thread/read failed: ephemeral threads do not support includeTurns" + ); + + assert!(App::can_fallback_from_include_turns_error(&unmaterialized)); + assert!(App::can_fallback_from_include_turns_error(&ephemeral)); + } + #[tokio::test] async fn open_agent_picker_marks_loaded_threads_open() -> Result<()> { let mut app = make_test_app().await; @@ -7068,6 +7278,122 @@ mod tests { Ok(()) } + #[tokio::test] + async fn attach_live_thread_for_selection_rejects_empty_non_ephemeral_fallback_threads() + -> Result<()> { + let mut app = make_test_app().await; + let mut app_server = + crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) + .await + .expect("embedded app server"); + let started = app_server + .start_thread(app.chat_widget.config_ref()) + .await?; + let thread_id = started.session.thread_id; + app.agent_navigation.upsert( + thread_id, + Some("Scout".to_string()), + Some("worker".to_string()), + /*is_closed*/ false, + ); + + let err = app + .attach_live_thread_for_selection(&mut app_server, thread_id) + .await + .expect_err("empty fallback should not attach as a blank replay-only thread"); + + assert_eq!( + err.to_string(), + format!("Agent thread {thread_id} is not yet available for replay or live attach.") + ); + assert!(!app.thread_event_channels.contains_key(&thread_id)); + Ok(()) + } + + #[tokio::test] + async fn attach_live_thread_for_selection_rejects_unmaterialized_fallback_threads() -> Result<()> + { + let mut app = make_test_app().await; + let mut app_server = + crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) + .await + .expect("embedded app server"); + let mut ephemeral_config = app.chat_widget.config_ref().clone(); + ephemeral_config.ephemeral = true; + let started = app_server.start_thread(&ephemeral_config).await?; + let thread_id = started.session.thread_id; + app.agent_navigation.upsert( + thread_id, + Some("Scout".to_string()), + Some("worker".to_string()), + /*is_closed*/ false, + ); + + let err = app + .attach_live_thread_for_selection(&mut app_server, thread_id) + .await + .expect_err("ephemeral fallback should not attach as a blank live thread"); + + assert_eq!( + err.to_string(), + format!("Agent thread {thread_id} is not yet available for replay or live attach.") + ); + assert!(!app.thread_event_channels.contains_key(&thread_id)); + Ok(()) + } + + #[tokio::test] + async fn should_attach_live_thread_for_selection_skips_closed_metadata_only_threads() { + let mut app = make_test_app().await; + let thread_id = ThreadId::new(); + app.agent_navigation.upsert( + thread_id, + Some("Ghost".to_string()), + Some("worker".to_string()), + /*is_closed*/ true, + ); + + assert!(!app.should_attach_live_thread_for_selection(thread_id)); + + app.agent_navigation.upsert( + thread_id, + Some("Ghost".to_string()), + Some("worker".to_string()), + /*is_closed*/ false, + ); + assert!(app.should_attach_live_thread_for_selection(thread_id)); + + app.thread_event_channels + .insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1)); + assert!(!app.should_attach_live_thread_for_selection(thread_id)); + } + + #[tokio::test] + async fn refresh_agent_picker_thread_liveness_prunes_closed_metadata_only_threads() -> Result<()> + { + let mut app = make_test_app().await; + let mut app_server = + crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()) + .await + .expect("embedded app server"); + let thread_id = ThreadId::new(); + app.agent_navigation.upsert( + thread_id, + Some("Ghost".to_string()), + Some("worker".to_string()), + /*is_closed*/ false, + ); + + let is_available = app + .refresh_agent_picker_thread_liveness(&mut app_server, thread_id) + .await; + + assert!(!is_available); + assert_eq!(app.agent_navigation.get(&thread_id), None); + assert!(!app.thread_event_channels.contains_key(&thread_id)); + Ok(()) + } + #[tokio::test] async fn open_agent_picker_prompts_to_enable_multi_agent_when_disabled() -> Result<()> { let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await; diff --git a/codex-rs/tui/src/app/agent_navigation.rs b/codex-rs/tui/src/app/agent_navigation.rs index c97c93f38..db9e47c5b 100644 --- a/codex-rs/tui/src/app/agent_navigation.rs +++ b/codex-rs/tui/src/app/agent_navigation.rs @@ -122,6 +122,16 @@ impl AgentNavigationState { self.order.clear(); } + /// Removes a tracked thread entirely from picker metadata and traversal order. + /// + /// This is reserved for entries that were only discovered opportunistically and never became + /// replayable local threads. Keeping those around after the backend confirms they are gone + /// would leave ghost rows in `/agent`. + pub(crate) fn remove(&mut self, thread_id: ThreadId) { + self.threads.remove(&thread_id); + self.order.retain(|candidate| *candidate != thread_id); + } + /// Returns whether there is at least one tracked thread other than the primary one. /// /// `App` uses this to decide whether the picker should be available even when the collaboration @@ -145,6 +155,14 @@ impl AgentNavigationState { .collect() } + /// Returns tracked thread ids in the same stable order used by the picker. + pub(crate) fn tracked_thread_ids(&self) -> Vec { + self.ordered_threads() + .into_iter() + .map(|(thread_id, _)| thread_id) + .collect() + } + /// Returns the adjacent thread id for keyboard navigation in stable spawn order. /// /// The caller must pass the thread whose transcript is actually being shown to the user, not