Fix tui_app_server ghost subagent entries in /agent (#16110)

Fixes #16092

The app-server-backed TUI could accumulate ghost subagent entries in
`/agent` after resume/backfill flows. Some of those rows were no longer
live according to the backend, but still appeared selectable in the
picker and could open as blank threads.

*Cause*
Unlike the legacy tui behavior, tui_app_server was creating local
picker/replay state for subagents discovered through metadata refresh
and loaded-thread backfill, even when no real local session or
transcript had been attached. That let stale ids survive in the picker
as if they were replayable threads.

*Fix*
Stop creating empty local thread channels during subagent metadata
hydration and loaded-thread backfill.
When opening /agent, prune metadata-only entries that thread/read
reports as terminally unavailable.
When selecting a discovered subagent that is still live but not yet
locally attached, materialize a real local session on demand from
thread/read instead of falling back to an empty replay state.
This commit is contained in:
Eric Traut
2026-03-29 12:19:34 -06:00
committed by GitHub
Unverified
parent 54d3ad1ede
commit 38e648ca67
2 changed files with 395 additions and 51 deletions
+377 -51
View File
@@ -2353,7 +2353,6 @@ impl App {
.await
{
Ok(thread) => {
self.ensure_thread_channel(thread_id);
self.upsert_agent_picker_thread(
thread_id,
thread.agent_nickname,
@@ -2627,51 +2626,18 @@ impl App {
/// historical id now" and converted into closed picker entries instead of deleting them, so
/// the stable traversal order remains intact for review and keyboard navigation.
async fn open_agent_picker(&mut self, app_server: &mut AppServerSession) {
let thread_ids: Vec<ThreadId> = self.thread_event_channels.keys().cloned().collect();
let mut thread_ids = self.agent_navigation.tracked_thread_ids();
for thread_id in self.thread_event_channels.keys().copied() {
if !thread_ids.contains(&thread_id) {
thread_ids.push(thread_id);
}
}
for thread_id in thread_ids {
let existing_entry = self.agent_navigation.get(&thread_id).cloned();
match app_server
.thread_read(thread_id, /*include_turns*/ false)
if !self
.refresh_agent_picker_thread_liveness(app_server, thread_id)
.await
{
Ok(thread) => {
self.upsert_agent_picker_thread(
thread_id,
thread.agent_nickname.or_else(|| {
existing_entry
.as_ref()
.and_then(|entry| entry.agent_nickname.clone())
}),
thread.agent_role.or_else(|| {
existing_entry
.as_ref()
.and_then(|entry| entry.agent_role.clone())
}),
matches!(
thread.status,
codex_app_server_protocol::ThreadStatus::NotLoaded
),
);
}
Err(err) => {
let is_closed = Self::closed_state_for_thread_read_error(
&err,
existing_entry.as_ref().map(|entry| entry.is_closed),
);
if let Some(entry) = existing_entry {
self.upsert_agent_picker_thread(
thread_id,
entry.agent_nickname,
entry.agent_role,
is_closed,
);
} else {
self.upsert_agent_picker_thread(
thread_id, /*agent_nickname*/ None, /*agent_role*/ None,
is_closed,
);
}
}
continue;
}
}
@@ -2744,6 +2710,14 @@ impl App {
Self::is_terminal_thread_read_error(err) || existing_is_closed.unwrap_or(false)
}
fn can_fallback_from_include_turns_error(err: &color_eyre::Report) -> bool {
err.chain().any(|cause| {
let message = cause.to_string();
message.contains("includeTurns is unavailable before first user message")
|| message.contains("ephemeral threads do not support includeTurns")
})
}
/// Updates cached picker metadata and then mirrors any visible-label change into the footer.
///
/// These two writes stay paired so the picker rows and contextual footer continue to describe
@@ -2774,6 +2748,168 @@ impl App {
self.sync_active_agent_label();
}
async fn refresh_agent_picker_thread_liveness(
&mut self,
app_server: &mut AppServerSession,
thread_id: ThreadId,
) -> bool {
let existing_entry = self.agent_navigation.get(&thread_id).cloned();
let has_replay_channel = self.thread_event_channels.contains_key(&thread_id);
match app_server
.thread_read(thread_id, /*include_turns*/ false)
.await
{
Ok(thread) => {
self.upsert_agent_picker_thread(
thread_id,
thread.agent_nickname.or_else(|| {
existing_entry
.as_ref()
.and_then(|entry| entry.agent_nickname.clone())
}),
thread.agent_role.or_else(|| {
existing_entry
.as_ref()
.and_then(|entry| entry.agent_role.clone())
}),
matches!(
thread.status,
codex_app_server_protocol::ThreadStatus::NotLoaded
),
);
true
}
Err(err) => {
if Self::is_terminal_thread_read_error(&err) && !has_replay_channel {
self.agent_navigation.remove(thread_id);
return false;
}
let is_closed = Self::closed_state_for_thread_read_error(
&err,
existing_entry.as_ref().map(|entry| entry.is_closed),
);
if let Some(entry) = existing_entry {
self.upsert_agent_picker_thread(
thread_id,
entry.agent_nickname,
entry.agent_role,
is_closed,
);
} else {
self.upsert_agent_picker_thread(
thread_id, /*agent_nickname*/ None, /*agent_role*/ None,
is_closed,
);
}
true
}
}
}
async fn session_state_for_thread_read(
&self,
thread_id: ThreadId,
thread: &codex_app_server_protocol::Thread,
) -> ThreadSessionState {
let mut session = self
.primary_session_configured
.clone()
.unwrap_or(ThreadSessionState {
thread_id,
forked_from_id: None,
thread_name: None,
model: self.chat_widget.current_model().to_string(),
model_provider_id: self.config.model_provider_id.clone(),
service_tier: self.chat_widget.current_service_tier(),
approval_policy: self.config.permissions.approval_policy.value(),
approvals_reviewer: self.config.approvals_reviewer,
sandbox_policy: self.config.permissions.sandbox_policy.get().clone(),
cwd: thread.cwd.clone(),
reasoning_effort: self.chat_widget.current_reasoning_effort(),
history_log_id: 0,
history_entry_count: 0,
network_proxy: None,
rollout_path: thread.path.clone(),
});
session.thread_id = thread_id;
session.thread_name = thread.name.clone();
session.model_provider_id = thread.model_provider.clone();
session.cwd = thread.cwd.clone();
session.rollout_path = thread.path.clone();
if let Some(model) =
read_session_model(&self.config, thread_id, thread.path.as_deref()).await
{
session.model = model;
} else if thread.path.is_some() {
session.model.clear();
}
session.history_log_id = 0;
session.history_entry_count = 0;
session
}
/// Materializes a live thread into local replay state when the picker knows about it but the
/// TUI has not cached a local event channel yet.
///
/// Resume-time backfill intentionally avoids creating empty placeholder channels, because those
/// placeholders make stale `/agent` entries open blank transcripts. When a user later selects a
/// still-live discovered thread, attach it on demand with a real resumed snapshot.
async fn attach_live_thread_for_selection(
&mut self,
app_server: &mut AppServerSession,
thread_id: ThreadId,
) -> Result<bool> {
if self.thread_event_channels.contains_key(&thread_id) {
return Ok(true);
}
let (session, turns, live_attached) = match app_server
.resume_thread(self.config.clone(), thread_id)
.await
{
Ok(started) => (started.session, started.turns, true),
Err(resume_err) => {
tracing::warn!(
thread_id = %thread_id,
error = %resume_err,
"failed to resume live thread for selection; falling back to thread/read"
);
let (thread, turns) = match app_server
.thread_read(thread_id, /*include_turns*/ true)
.await
{
Ok(thread) => {
let turns = thread.turns.clone();
(thread, turns)
}
Err(err) if Self::can_fallback_from_include_turns_error(&err) => {
let thread = app_server
.thread_read(thread_id, /*include_turns*/ false)
.await?;
(thread, Vec::new())
}
Err(err) => return Err(err),
};
if turns.is_empty() {
// A `thread/read` fallback without turns would create a blank local replay
// channel with no live listener attached, which blocks later real re-attach.
return Err(color_eyre::eyre::eyre!(
"Agent thread {thread_id} is not yet available for replay or live attach."
));
}
let mut session = self.session_state_for_thread_read(thread_id, &thread).await;
// `thread/read` can seed replay state, but it does not attach the app-server
// listener that `thread/resume` establishes, so treat this path as replay-only.
session.model.clear();
(session, turns, false)
}
};
let channel = self.ensure_thread_channel(thread_id);
let mut store = channel.store.lock().await;
store.set_session(session, turns);
Ok(live_attached)
}
/// Replaces the chat widget and re-seeds the new widget's collab metadata from the navigation
/// cache.
///
@@ -2810,15 +2946,43 @@ impl App {
return Ok(());
}
if !self.thread_event_channels.contains_key(&thread_id) {
if !self
.refresh_agent_picker_thread_liveness(app_server, thread_id)
.await
{
self.chat_widget
.add_error_message(format!("Failed to attach to agent thread {thread_id}."));
.add_error_message(format!("Agent thread {thread_id} is no longer available."));
return Ok(());
}
let is_replay_only = self
let mut is_replay_only = self
.agent_navigation
.get(&thread_id)
.is_some_and(|entry| entry.is_closed);
let mut attached_replay_only = false;
if self.should_attach_live_thread_for_selection(thread_id) {
match self
.attach_live_thread_for_selection(app_server, thread_id)
.await
{
Ok(live_attached) => {
attached_replay_only = !live_attached;
if attached_replay_only {
is_replay_only = true;
}
}
Err(err) => {
self.chat_widget.add_error_message(format!(
"Failed to attach to agent thread {thread_id}: {err}"
));
return Ok(());
}
}
} else if !self.thread_event_channels.contains_key(&thread_id) && is_replay_only {
self.chat_widget
.add_error_message(format!("Agent thread {thread_id} is no longer available."));
return Ok(());
}
let previous_thread_id = self.active_thread_id;
self.store_active_thread_receiver().await;
@@ -2850,10 +3014,14 @@ impl App {
self.reset_for_thread_switch(tui)?;
self.replay_thread_snapshot(snapshot, !is_replay_only);
if is_replay_only {
self.chat_widget.add_info_message(
format!("Agent thread {thread_id} is closed. Replaying saved transcript."),
/*hint*/ None,
);
let message = if attached_replay_only {
format!(
"Agent thread {thread_id} could not be resumed live. Replaying saved transcript."
)
} else {
format!("Agent thread {thread_id} is closed. Replaying saved transcript.")
};
self.chat_widget.add_info_message(message, /*hint*/ None);
}
self.drain_active_thread_events(tui).await?;
self.refresh_pending_thread_approvals().await;
@@ -2861,6 +3029,14 @@ impl App {
Ok(())
}
fn should_attach_live_thread_for_selection(&self, thread_id: ThreadId) -> bool {
!self.thread_event_channels.contains_key(&thread_id)
&& self
.agent_navigation
.get(&thread_id)
.is_none_or(|entry| !entry.is_closed)
}
fn reset_for_thread_switch(&mut self, tui: &mut tui::Tui) -> Result<()> {
self.overlay = None;
self.transcript_cells.clear();
@@ -3014,7 +3190,6 @@ impl App {
}
for thread in find_loaded_subagent_threads_for_primary(threads, primary_thread_id) {
self.ensure_thread_channel(thread.thread_id);
self.upsert_agent_picker_thread(
thread.thread_id,
thread.agent_nickname,
@@ -6971,6 +7146,28 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn open_agent_picker_prunes_terminal_metadata_only_threads() -> Result<()> {
let mut app = make_test_app().await;
let mut app_server =
crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let thread_id = ThreadId::new();
app.agent_navigation.upsert(
thread_id,
Some("Ghost".to_string()),
Some("worker".to_string()),
/*is_closed*/ false,
);
app.open_agent_picker(&mut app_server).await;
assert_eq!(app.agent_navigation.get(&thread_id), None);
assert!(app.agent_navigation.is_empty());
Ok(())
}
#[tokio::test]
async fn open_agent_picker_marks_terminal_read_errors_closed() -> Result<()> {
let mut app = make_test_app().await;
@@ -7041,6 +7238,19 @@ mod tests {
));
}
#[test]
fn include_turns_fallback_detection_handles_unmaterialized_and_ephemeral_threads() {
let unmaterialized = color_eyre::eyre::eyre!(
"thread/read failed during TUI session lookup: thread/read failed: thread thr_123 is not materialized yet; includeTurns is unavailable before first user message"
);
let ephemeral = color_eyre::eyre::eyre!(
"thread/read failed during TUI session lookup: thread/read failed: ephemeral threads do not support includeTurns"
);
assert!(App::can_fallback_from_include_turns_error(&unmaterialized));
assert!(App::can_fallback_from_include_turns_error(&ephemeral));
}
#[tokio::test]
async fn open_agent_picker_marks_loaded_threads_open() -> Result<()> {
let mut app = make_test_app().await;
@@ -7068,6 +7278,122 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn attach_live_thread_for_selection_rejects_empty_non_ephemeral_fallback_threads()
-> Result<()> {
let mut app = make_test_app().await;
let mut app_server =
crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let started = app_server
.start_thread(app.chat_widget.config_ref())
.await?;
let thread_id = started.session.thread_id;
app.agent_navigation.upsert(
thread_id,
Some("Scout".to_string()),
Some("worker".to_string()),
/*is_closed*/ false,
);
let err = app
.attach_live_thread_for_selection(&mut app_server, thread_id)
.await
.expect_err("empty fallback should not attach as a blank replay-only thread");
assert_eq!(
err.to_string(),
format!("Agent thread {thread_id} is not yet available for replay or live attach.")
);
assert!(!app.thread_event_channels.contains_key(&thread_id));
Ok(())
}
#[tokio::test]
async fn attach_live_thread_for_selection_rejects_unmaterialized_fallback_threads() -> Result<()>
{
let mut app = make_test_app().await;
let mut app_server =
crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let mut ephemeral_config = app.chat_widget.config_ref().clone();
ephemeral_config.ephemeral = true;
let started = app_server.start_thread(&ephemeral_config).await?;
let thread_id = started.session.thread_id;
app.agent_navigation.upsert(
thread_id,
Some("Scout".to_string()),
Some("worker".to_string()),
/*is_closed*/ false,
);
let err = app
.attach_live_thread_for_selection(&mut app_server, thread_id)
.await
.expect_err("ephemeral fallback should not attach as a blank live thread");
assert_eq!(
err.to_string(),
format!("Agent thread {thread_id} is not yet available for replay or live attach.")
);
assert!(!app.thread_event_channels.contains_key(&thread_id));
Ok(())
}
#[tokio::test]
async fn should_attach_live_thread_for_selection_skips_closed_metadata_only_threads() {
let mut app = make_test_app().await;
let thread_id = ThreadId::new();
app.agent_navigation.upsert(
thread_id,
Some("Ghost".to_string()),
Some("worker".to_string()),
/*is_closed*/ true,
);
assert!(!app.should_attach_live_thread_for_selection(thread_id));
app.agent_navigation.upsert(
thread_id,
Some("Ghost".to_string()),
Some("worker".to_string()),
/*is_closed*/ false,
);
assert!(app.should_attach_live_thread_for_selection(thread_id));
app.thread_event_channels
.insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1));
assert!(!app.should_attach_live_thread_for_selection(thread_id));
}
#[tokio::test]
async fn refresh_agent_picker_thread_liveness_prunes_closed_metadata_only_threads() -> Result<()>
{
let mut app = make_test_app().await;
let mut app_server =
crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let thread_id = ThreadId::new();
app.agent_navigation.upsert(
thread_id,
Some("Ghost".to_string()),
Some("worker".to_string()),
/*is_closed*/ false,
);
let is_available = app
.refresh_agent_picker_thread_liveness(&mut app_server, thread_id)
.await;
assert!(!is_available);
assert_eq!(app.agent_navigation.get(&thread_id), None);
assert!(!app.thread_event_channels.contains_key(&thread_id));
Ok(())
}
#[tokio::test]
async fn open_agent_picker_prompts_to_enable_multi_agent_when_disabled() -> Result<()> {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
+18
View File
@@ -122,6 +122,16 @@ impl AgentNavigationState {
self.order.clear();
}
/// Removes a tracked thread entirely from picker metadata and traversal order.
///
/// This is reserved for entries that were only discovered opportunistically and never became
/// replayable local threads. Keeping those around after the backend confirms they are gone
/// would leave ghost rows in `/agent`.
pub(crate) fn remove(&mut self, thread_id: ThreadId) {
self.threads.remove(&thread_id);
self.order.retain(|candidate| *candidate != thread_id);
}
/// Returns whether there is at least one tracked thread other than the primary one.
///
/// `App` uses this to decide whether the picker should be available even when the collaboration
@@ -145,6 +155,14 @@ impl AgentNavigationState {
.collect()
}
/// Returns tracked thread ids in the same stable order used by the picker.
pub(crate) fn tracked_thread_ids(&self) -> Vec<ThreadId> {
self.ordered_threads()
.into_iter()
.map(|(thread_id, _)| thread_id)
.collect()
}
/// Returns the adjacent thread id for keyboard navigation in stable spawn order.
///
/// The caller must pass the thread whose transcript is actually being shown to the user, not