Remove core protocol dependency [1/2] (#20324)

## Why

This stack moves `codex-tui` away from the core protocol event surface
and toward app-server API shapes plus TUI-owned local models. This first
PR sets up the lower-risk foundation: it introduces the local model
surface and extracts app-server event routing into focused TUI modules
while preserving the existing behavior for the larger migration in PR2.

This PR is part 1 of a 2-PR stack:

1. Add TUI-owned replacement models and extract app-server event
routing.
2. Move the active TUI flow to app-server notifications and delete
obsolete adapter code.

## What changed

- Added TUI-owned approval, diff, session state, session resume, token
usage, and user-message models.
- Added `app/app_server_event_targets.rs` and `app/app_server_events.rs`
to hold app-server event targeting and dispatch logic outside `app.rs`.
- Updated app/status tests to use the local model layer and added
focused routing coverage.
- Boxed a few large async TUI test futures so this base layer remains
checkable without overflowing the default test stack.

## Verification

- `cargo check -p codex-tui --tests`
This commit is contained in:
Eric Traut
2026-04-30 10:52:19 -07:00
committed by GitHub
Unverified
parent 487716ae74
commit c70cdc108f
11 changed files with 1294 additions and 132 deletions
+2 -1
View File
@@ -183,7 +183,8 @@ use tokio::task::JoinHandle;
use toml::Value as TomlValue;
use uuid::Uuid;
mod agent_navigation;
mod app_server_adapter;
mod app_server_event_targets;
mod app_server_events;
pub(crate) mod app_server_requests;
mod background_requests;
mod config_persistence;
@@ -0,0 +1,218 @@
//! Thread targeting helpers for app-server requests and notifications.
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_protocol::ThreadId;
pub(super) fn server_request_thread_id(request: &ServerRequest) -> Option<ThreadId> {
match request {
ServerRequest::CommandExecutionRequestApproval { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::FileChangeRequestApproval { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::ToolRequestUserInput { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::McpServerElicitationRequest { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::PermissionsRequestApproval { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::DynamicToolCall { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::ChatgptAuthTokensRefresh { .. }
| ServerRequest::ApplyPatchApproval { .. }
| ServerRequest::ExecCommandApproval { .. } => None,
}
}
#[derive(Debug, PartialEq, Eq)]
pub(super) enum ServerNotificationThreadTarget {
Thread(ThreadId),
InvalidThreadId(String),
Global,
}
pub(super) fn server_notification_thread_target(
notification: &ServerNotification,
) -> ServerNotificationThreadTarget {
let thread_id = match notification {
ServerNotification::Error(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadStarted(notification) => Some(notification.thread.id.as_str()),
ServerNotification::ThreadStatusChanged(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadArchived(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadUnarchived(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadClosed(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadNameUpdated(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadTokenUsageUpdated(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadGoalUpdated(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadGoalCleared(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::TurnStarted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::HookStarted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::TurnCompleted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::HookCompleted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::TurnDiffUpdated(notification) => Some(notification.thread_id.as_str()),
ServerNotification::TurnPlanUpdated(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ItemStarted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ItemGuardianApprovalReviewStarted(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ItemGuardianApprovalReviewCompleted(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ItemCompleted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::RawResponseItemCompleted(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::AgentMessageDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::PlanDelta(notification) => Some(notification.thread_id.as_str()),
ServerNotification::CommandExecutionOutputDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::TerminalInteraction(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::FileChangeOutputDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::FileChangePatchUpdated(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ServerRequestResolved(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::McpToolCallProgress(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ReasoningSummaryTextDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ReasoningSummaryPartAdded(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ReasoningTextDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ContextCompacted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ModelRerouted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ModelVerification(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeStarted(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeItemAdded(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeTranscriptDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeTranscriptDone(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeOutputAudioDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeSdp(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeError(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeClosed(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::Warning(notification) => notification.thread_id.as_deref(),
ServerNotification::GuardianWarning(notification) => Some(notification.thread_id.as_str()),
ServerNotification::SkillsChanged(_)
| ServerNotification::McpServerStatusUpdated(_)
| ServerNotification::McpServerOauthLoginCompleted(_)
| ServerNotification::AccountUpdated(_)
| ServerNotification::AccountRateLimitsUpdated(_)
| ServerNotification::AppListUpdated(_)
| ServerNotification::RemoteControlStatusChanged(_)
| ServerNotification::ExternalAgentConfigImportCompleted(_)
| ServerNotification::DeprecationNotice(_)
| ServerNotification::ConfigWarning(_)
| ServerNotification::FuzzyFileSearchSessionUpdated(_)
| ServerNotification::FuzzyFileSearchSessionCompleted(_)
| ServerNotification::CommandExecOutputDelta(_)
| ServerNotification::FsChanged(_)
| ServerNotification::WindowsWorldWritableWarning(_)
| ServerNotification::WindowsSandboxSetupCompleted(_)
| ServerNotification::AccountLoginCompleted(_) => None,
};
match thread_id {
Some(thread_id) => match ThreadId::from_string(thread_id) {
Ok(thread_id) => ServerNotificationThreadTarget::Thread(thread_id),
Err(_) => ServerNotificationThreadTarget::InvalidThreadId(thread_id.to_string()),
},
None => ServerNotificationThreadTarget::Global,
}
}
#[cfg(test)]
mod tests {
use super::ServerNotificationThreadTarget;
use super::server_notification_thread_target;
use codex_app_server_protocol::GuardianWarningNotification;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::WarningNotification;
use codex_protocol::ThreadId;
use pretty_assertions::assert_eq;
#[test]
fn warning_notifications_without_threads_are_global() {
let notification = ServerNotification::Warning(WarningNotification {
thread_id: None,
message: "warning".to_string(),
});
let target = server_notification_thread_target(&notification);
assert_eq!(target, ServerNotificationThreadTarget::Global);
}
#[test]
fn warning_notifications_route_to_threads_when_thread_id_is_present() {
let thread_id = ThreadId::new();
let notification = ServerNotification::Warning(WarningNotification {
thread_id: Some(thread_id.to_string()),
message: "warning".to_string(),
});
let target = server_notification_thread_target(&notification);
assert_eq!(target, ServerNotificationThreadTarget::Thread(thread_id));
}
#[test]
fn guardian_warning_notifications_route_to_threads() {
let thread_id = ThreadId::new();
let notification = ServerNotification::GuardianWarning(GuardianWarningNotification {
thread_id: thread_id.to_string(),
message: "warning".to_string(),
});
let target = server_notification_thread_target(&notification);
assert_eq!(target, ServerNotificationThreadTarget::Thread(thread_id));
}
}
+208
View File
@@ -0,0 +1,208 @@
//! App-server event stream handling for the TUI app.
use super::App;
use super::app_server_event_targets::ServerNotificationThreadTarget;
use super::app_server_event_targets::server_notification_thread_target;
use super::app_server_event_targets::server_request_thread_id;
use crate::app_command::AppCommand;
use crate::app_event::AppEvent;
use crate::app_server_session::AppServerSession;
use crate::app_server_session::app_server_rate_limit_snapshot_to_core;
use crate::app_server_session::status_account_display_from_auth_mode;
use codex_app_server_client::AppServerEvent;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
impl App {
fn refresh_mcp_startup_expected_servers_from_config(&mut self) {
let enabled_config_mcp_servers: Vec<String> = self
.chat_widget
.config_ref()
.mcp_servers
.get()
.iter()
.filter_map(|(name, server)| server.enabled.then_some(name.clone()))
.collect();
self.chat_widget
.set_mcp_startup_expected_servers(enabled_config_mcp_servers);
}
pub(super) async fn handle_app_server_event(
&mut self,
app_server_client: &AppServerSession,
event: AppServerEvent,
) {
match event {
AppServerEvent::Lagged { skipped } => {
tracing::warn!(
skipped,
"app-server event consumer lagged; dropping ignored events"
);
self.refresh_mcp_startup_expected_servers_from_config();
self.chat_widget.finish_mcp_startup_after_lag();
}
AppServerEvent::ServerNotification(notification) => {
self.handle_server_notification_event(app_server_client, notification)
.await;
}
AppServerEvent::ServerRequest(request) => {
self.handle_server_request_event(app_server_client, request)
.await;
}
AppServerEvent::Disconnected { message } => {
tracing::warn!("app-server event stream disconnected: {message}");
self.chat_widget.add_error_message(message.clone());
self.app_event_tx.send(AppEvent::FatalExitRequest(message));
}
}
}
async fn handle_server_notification_event(
&mut self,
app_server_client: &AppServerSession,
notification: ServerNotification,
) {
match &notification {
ServerNotification::ServerRequestResolved(notification) => {
if let Some(request) = self
.pending_app_server_requests
.resolve_notification(&notification.request_id)
{
self.chat_widget.dismiss_app_server_request(&request);
}
}
ServerNotification::McpServerStatusUpdated(_) => {
self.refresh_mcp_startup_expected_servers_from_config();
}
ServerNotification::AccountRateLimitsUpdated(notification) => {
self.chat_widget.on_rate_limit_snapshot(Some(
app_server_rate_limit_snapshot_to_core(notification.rate_limits.clone()),
));
return;
}
ServerNotification::AccountUpdated(notification) => {
self.chat_widget.update_account_state(
status_account_display_from_auth_mode(
notification.auth_mode,
notification.plan_type,
),
notification.plan_type,
matches!(
notification.auth_mode,
Some(AuthMode::Chatgpt) | Some(AuthMode::ChatgptAuthTokens)
),
);
return;
}
ServerNotification::ExternalAgentConfigImportCompleted(_) => {
let cwd = self.chat_widget.config_ref().cwd.to_path_buf();
if let Err(err) = self.refresh_in_memory_config_from_disk().await {
tracing::warn!(
error = %err,
"failed to refresh config after external agent config import"
);
}
self.chat_widget.refresh_plugin_mentions();
self.chat_widget.submit_op(AppCommand::reload_user_config());
self.fetch_plugins_list(app_server_client, cwd);
return;
}
_ => {}
}
match server_notification_thread_target(&notification) {
ServerNotificationThreadTarget::Thread(thread_id) => {
let result = if self.primary_thread_id == Some(thread_id)
|| self.primary_thread_id.is_none()
{
self.enqueue_primary_thread_notification(notification).await
} else {
self.enqueue_thread_notification(thread_id, notification)
.await
};
if let Err(err) = result {
tracing::warn!("failed to enqueue app-server notification: {err}");
}
return;
}
ServerNotificationThreadTarget::InvalidThreadId(thread_id) => {
tracing::warn!(
thread_id,
"ignoring app-server notification with invalid thread_id"
);
return;
}
ServerNotificationThreadTarget::Global => {}
}
self.chat_widget
.handle_server_notification(notification, /*replay_kind*/ None);
}
async fn handle_server_request_event(
&mut self,
app_server_client: &AppServerSession,
request: ServerRequest,
) {
if let Some(unsupported) = self
.pending_app_server_requests
.note_server_request(&request)
{
tracing::warn!(
request_id = ?unsupported.request_id,
message = unsupported.message,
"rejecting unsupported app-server request"
);
self.chat_widget
.add_error_message(unsupported.message.clone());
if let Err(err) = self
.reject_app_server_request(
app_server_client,
unsupported.request_id,
unsupported.message,
)
.await
{
tracing::warn!("{err}");
}
return;
}
let Some(thread_id) = server_request_thread_id(&request) else {
tracing::warn!("ignoring threadless app-server request");
return;
};
let result =
if self.primary_thread_id == Some(thread_id) || self.primary_thread_id.is_none() {
self.enqueue_primary_thread_request(request).await
} else {
self.enqueue_thread_request(thread_id, request).await
};
if let Err(err) = result {
tracing::warn!("failed to enqueue app-server request: {err}");
}
}
async fn reject_app_server_request(
&self,
app_server_client: &AppServerSession,
request_id: codex_app_server_protocol::RequestId,
reason: String,
) -> std::result::Result<(), String> {
app_server_client
.reject_server_request(
request_id,
JSONRPCErrorError {
code: -32000,
message: reason,
data: None,
},
)
.await
.map_err(|err| format!("failed to reject app-server request: {err}"))
}
}
+172 -131
View File
@@ -1227,15 +1227,17 @@ async fn token_usage_update_refreshes_status_line_with_runtime_context_window()
#[tokio::test]
async fn open_agent_picker_keeps_missing_threads_for_replay() -> Result<()> {
let mut app = make_test_app().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let mut app = Box::pin(make_test_app()).await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let thread_id = ThreadId::new();
app.thread_event_channels
.insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1));
app.open_agent_picker(&mut app_server).await;
Box::pin(app.open_agent_picker(&mut app_server)).await;
assert_eq!(app.thread_event_channels.contains_key(&thread_id), true);
assert_eq!(
@@ -1252,10 +1254,12 @@ async fn open_agent_picker_keeps_missing_threads_for_replay() -> Result<()> {
#[tokio::test]
async fn open_agent_picker_preserves_cached_metadata_for_replay_threads() -> Result<()> {
let mut app = make_test_app().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let mut app = Box::pin(make_test_app()).await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let thread_id = ThreadId::new();
app.thread_event_channels
.insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1));
@@ -1266,7 +1270,7 @@ async fn open_agent_picker_preserves_cached_metadata_for_replay_threads() -> Res
/*is_closed*/ true,
);
app.open_agent_picker(&mut app_server).await;
Box::pin(app.open_agent_picker(&mut app_server)).await;
assert_eq!(app.thread_event_channels.contains_key(&thread_id), true);
assert_eq!(
@@ -1282,10 +1286,12 @@ async fn open_agent_picker_preserves_cached_metadata_for_replay_threads() -> Res
#[tokio::test]
async fn open_agent_picker_prunes_terminal_metadata_only_threads() -> Result<()> {
let mut app = make_test_app().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let mut app = Box::pin(make_test_app()).await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let thread_id = ThreadId::new();
app.agent_navigation.upsert(
thread_id,
@@ -1294,7 +1300,7 @@ async fn open_agent_picker_prunes_terminal_metadata_only_threads() -> Result<()>
/*is_closed*/ false,
);
app.open_agent_picker(&mut app_server).await;
Box::pin(app.open_agent_picker(&mut app_server)).await;
assert_eq!(app.agent_navigation.get(&thread_id), None);
assert!(app.agent_navigation.is_empty());
@@ -1303,10 +1309,12 @@ async fn open_agent_picker_prunes_terminal_metadata_only_threads() -> Result<()>
#[tokio::test]
async fn open_agent_picker_marks_terminal_read_errors_closed() -> Result<()> {
let mut app = make_test_app().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let mut app = Box::pin(make_test_app()).await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let thread_id = ThreadId::new();
app.thread_event_channels
.insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1));
@@ -1317,7 +1325,7 @@ async fn open_agent_picker_marks_terminal_read_errors_closed() -> Result<()> {
/*is_closed*/ false,
);
app.open_agent_picker(&mut app_server).await;
Box::pin(app.open_agent_picker(&mut app_server)).await;
assert_eq!(
app.agent_navigation.get(&thread_id),
@@ -1332,10 +1340,12 @@ async fn open_agent_picker_marks_terminal_read_errors_closed() -> Result<()> {
#[tokio::test]
async fn open_agent_picker_marks_loaded_threads_open() -> Result<()> {
let mut app = make_test_app().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let mut app = Box::pin(make_test_app()).await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let started = app_server
.start_thread(app.chat_widget.config_ref())
.await?;
@@ -1343,7 +1353,7 @@ async fn open_agent_picker_marks_loaded_threads_open() -> Result<()> {
app.thread_event_channels
.insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1));
app.open_agent_picker(&mut app_server).await;
Box::pin(app.open_agent_picker(&mut app_server)).await;
assert_eq!(
app.agent_navigation.get(&thread_id),
@@ -1356,65 +1366,87 @@ async fn open_agent_picker_marks_loaded_threads_open() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn attach_live_thread_for_selection_rejects_empty_non_ephemeral_fallback_threads()
-> Result<()> {
let mut app = make_test_app().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let started = app_server
.start_thread(app.chat_widget.config_ref())
.await?;
let thread_id = started.session.thread_id;
app.agent_navigation.upsert(
thread_id,
Some("Scout".to_string()),
Some("worker".to_string()),
/*is_closed*/ false,
);
#[test]
fn attach_live_thread_for_selection_rejects_empty_non_ephemeral_fallback_threads() -> Result<()> {
const WORKER_THREADS: usize = 1;
const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024;
let err = app
.attach_live_thread_for_selection(&mut app_server, thread_id)
.await
.expect_err("empty fallback should not attach as a blank replay-only thread");
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORKER_THREADS)
.thread_stack_size(TEST_STACK_SIZE_BYTES)
.enable_all()
.build()?;
assert_eq!(
err.to_string(),
format!("Agent thread {thread_id} is not yet available for replay or live attach.")
);
assert!(!app.thread_event_channels.contains_key(&thread_id));
Ok(())
runtime.block_on(async {
let config = {
let app = make_test_app().await;
app.chat_widget.config_ref().clone()
};
let mut app_server = crate::start_embedded_app_server_for_picker(&config)
.await
.expect("embedded app server");
let started = app_server.start_thread(&config).await?;
let thread_id = started.session.thread_id;
let mut app = make_test_app().await;
app.agent_navigation.upsert(
thread_id,
Some("Scout".to_string()),
Some("worker".to_string()),
/*is_closed*/ false,
);
let err = app
.attach_live_thread_for_selection(&mut app_server, thread_id)
.await
.expect_err("empty fallback should not attach as a blank replay-only thread");
assert_eq!(
err.to_string(),
format!("Agent thread {thread_id} is not yet available for replay or live attach.")
);
assert!(!app.thread_event_channels.contains_key(&thread_id));
Ok(())
})
}
#[tokio::test]
async fn attach_live_thread_for_selection_rejects_unmaterialized_fallback_threads() -> Result<()> {
let mut app = make_test_app().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let mut ephemeral_config = app.chat_widget.config_ref().clone();
ephemeral_config.ephemeral = true;
let started = app_server.start_thread(&ephemeral_config).await?;
let thread_id = started.session.thread_id;
app.agent_navigation.upsert(
thread_id,
Some("Scout".to_string()),
Some("worker".to_string()),
/*is_closed*/ false,
);
#[test]
fn attach_live_thread_for_selection_rejects_unmaterialized_fallback_threads() -> Result<()> {
const WORKER_THREADS: usize = 1;
const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024;
let err = app
.attach_live_thread_for_selection(&mut app_server, thread_id)
.await
.expect_err("ephemeral fallback should not attach as a blank live thread");
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(WORKER_THREADS)
.thread_stack_size(TEST_STACK_SIZE_BYTES)
.enable_all()
.build()?;
assert_eq!(
err.to_string(),
format!("Agent thread {thread_id} is not yet available for replay or live attach.")
);
assert!(!app.thread_event_channels.contains_key(&thread_id));
Ok(())
runtime.block_on(async {
let mut app = make_test_app().await;
let mut app_server =
crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref()).await?;
let mut ephemeral_config = app.chat_widget.config_ref().clone();
ephemeral_config.ephemeral = true;
let started = app_server.start_thread(&ephemeral_config).await?;
let thread_id = started.session.thread_id;
app.agent_navigation.upsert(
thread_id,
Some("Scout".to_string()),
Some("worker".to_string()),
/*is_closed*/ false,
);
let err = app
.attach_live_thread_for_selection(&mut app_server, thread_id)
.await
.expect_err("ephemeral fallback should not attach as a blank live thread");
assert_eq!(
err.to_string(),
format!("Agent thread {thread_id} is not yet available for replay or live attach.")
);
assert!(!app.thread_event_channels.contains_key(&thread_id));
Ok(())
})
}
#[tokio::test]
@@ -1445,10 +1477,12 @@ async fn should_attach_live_thread_for_selection_skips_closed_metadata_only_thre
#[tokio::test]
async fn refresh_agent_picker_thread_liveness_prunes_closed_metadata_only_threads() -> Result<()> {
let mut app = make_test_app().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let mut app = Box::pin(make_test_app()).await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let thread_id = ThreadId::new();
app.agent_navigation.upsert(
thread_id,
@@ -1457,9 +1491,8 @@ async fn refresh_agent_picker_thread_liveness_prunes_closed_metadata_only_thread
/*is_closed*/ false,
);
let is_available = app
.refresh_agent_picker_thread_liveness(&mut app_server, thread_id)
.await;
let is_available =
Box::pin(app.refresh_agent_picker_thread_liveness(&mut app_server, thread_id)).await;
assert!(!is_available);
assert_eq!(app.agent_navigation.get(&thread_id), None);
@@ -1469,13 +1502,15 @@ async fn refresh_agent_picker_thread_liveness_prunes_closed_metadata_only_thread
#[tokio::test]
async fn open_agent_picker_prompts_to_enable_multi_agent_when_disabled() -> Result<()> {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let (mut app, mut app_event_rx, _op_rx) = Box::pin(make_test_app_with_channels()).await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let _ = app.config.features.disable(Feature::Collab);
app.open_agent_picker(&mut app_server).await;
Box::pin(app.open_agent_picker(&mut app_server)).await;
app.chat_widget
.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
@@ -1499,16 +1534,16 @@ async fn open_agent_picker_prompts_to_enable_multi_agent_when_disabled() -> Resu
#[tokio::test]
async fn update_memory_settings_persists_and_updates_widget_config() -> Result<()> {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let (mut app, _app_event_rx, _op_rx) = Box::pin(make_test_app_with_channels()).await;
let codex_home = tempdir()?;
app.config.codex_home = codex_home.path().to_path_buf().abs();
let mut app_server = crate::start_embedded_app_server_for_picker(&app.config).await?;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(&app.config)).await?;
app.update_memory_settings_with_app_server(
Box::pin(app.update_memory_settings_with_app_server(
&mut app_server,
/*use_memories*/ false,
/*generate_memories*/ false,
)
))
.await;
assert!(!app.config.memories.use_memories);
@@ -1542,22 +1577,22 @@ async fn update_memory_settings_persists_and_updates_widget_config() -> Result<(
#[tokio::test]
async fn update_memory_settings_updates_current_thread_memory_mode() -> Result<()> {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let (mut app, _app_event_rx, _op_rx) = Box::pin(make_test_app_with_channels()).await;
let codex_home = tempdir()?;
app.config.codex_home = codex_home.path().to_path_buf().abs();
// Seed the previous setting so this test exercises the thread-mode update path.
app.config.memories.generate_memories = true;
let mut app_server = crate::start_embedded_app_server_for_picker(&app.config).await?;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(&app.config)).await?;
let started = app_server.start_thread(&app.config).await?;
let thread_id = started.session.thread_id;
app.active_thread_id = Some(thread_id);
app.update_memory_settings_with_app_server(
Box::pin(app.update_memory_settings_with_app_server(
&mut app_server,
/*use_memories*/ true,
/*generate_memories*/ false,
)
))
.await;
let state_db = codex_state::StateRuntime::init(
@@ -1578,7 +1613,7 @@ async fn update_memory_settings_updates_current_thread_memory_mode() -> Result<(
#[tokio::test]
async fn reset_memories_clears_local_memory_directories() -> Result<()> {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let (mut app, _app_event_rx, _op_rx) = Box::pin(make_test_app_with_channels()).await;
let codex_home = tempdir()?;
app.config.codex_home = codex_home.path().to_path_buf().abs();
app.config.sqlite_home = codex_home.path().to_path_buf();
@@ -1594,9 +1629,9 @@ async fn reset_memories_clears_local_memory_directories() -> Result<()> {
)?;
std::fs::write(extensions_root.join("stale.txt"), "stale extension\n")?;
let mut app_server = crate::start_embedded_app_server_for_picker(&app.config).await?;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(&app.config)).await?;
app.reset_memories_with_app_server(&mut app_server).await;
Box::pin(app.reset_memories_with_app_server(&mut app_server)).await;
assert_eq!(std::fs::read_dir(&memory_root)?.count(), 0);
@@ -2138,15 +2173,17 @@ async fn update_feature_flags_disabling_guardian_in_profile_keeps_inherited_non_
#[tokio::test]
async fn open_agent_picker_allows_existing_agent_threads_when_feature_is_disabled() -> Result<()> {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let (mut app, mut app_event_rx, _op_rx) = Box::pin(make_test_app_with_channels()).await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let thread_id = ThreadId::new();
app.thread_event_channels
.insert(thread_id, ThreadEventChannel::new(/*capacity*/ 1));
app.open_agent_picker(&mut app_server).await;
Box::pin(app.open_agent_picker(&mut app_server)).await;
app.chat_widget
.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
@@ -4856,7 +4893,7 @@ async fn thread_rollback_response_discards_queued_active_thread_events() {
#[tokio::test]
async fn new_session_requests_shutdown_for_previous_conversation() {
let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels().await;
let (mut app, mut app_event_rx, mut op_rx) = Box::pin(make_test_app_with_channels()).await;
let thread_id = ThreadId::new();
let event = SessionConfiguredEvent {
@@ -4887,10 +4924,12 @@ async fn new_session_requests_shutdown_for_previous_conversation() {
while app_event_rx.try_recv().is_ok() {}
while op_rx.try_recv().is_ok() {}
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
app.shutdown_current_thread(&mut app_server).await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
Box::pin(app.shutdown_current_thread(&mut app_server)).await;
assert!(
op_rx.try_recv().is_err(),
@@ -4904,12 +4943,12 @@ async fn shutdown_first_exit_returns_immediate_exit_when_shutdown_submit_fails()
let thread_id = ThreadId::new();
app.active_thread_id = Some(thread_id);
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let control = app
.handle_exit_mode(&mut app_server, ExitMode::ShutdownFirst)
.await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let control = Box::pin(app.handle_exit_mode(&mut app_server, ExitMode::ShutdownFirst)).await;
assert_eq!(app.pending_shutdown_exit_thread_id, None);
assert!(matches!(
@@ -4920,16 +4959,16 @@ async fn shutdown_first_exit_returns_immediate_exit_when_shutdown_submit_fails()
#[tokio::test]
async fn shutdown_first_exit_uses_app_server_shutdown_without_submitting_op() {
let (mut app, _app_event_rx, mut op_rx) = make_test_app_with_channels().await;
let (mut app, _app_event_rx, mut op_rx) = Box::pin(make_test_app_with_channels()).await;
let thread_id = ThreadId::new();
app.active_thread_id = Some(thread_id);
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let control = app
.handle_exit_mode(&mut app_server, ExitMode::ShutdownFirst)
.await;
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let control = Box::pin(app.handle_exit_mode(&mut app_server, ExitMode::ShutdownFirst)).await;
assert_eq!(app.pending_shutdown_exit_thread_id, None);
assert!(matches!(
@@ -4945,9 +4984,11 @@ async fn shutdown_first_exit_uses_app_server_shutdown_without_submitting_op() {
#[tokio::test]
async fn interrupt_without_active_turn_is_treated_as_handled() {
let mut app = make_test_app().await;
let mut app_server = crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
app.chat_widget.config_ref(),
))
.await
.expect("embedded app server");
let started = app_server
.start_thread(app.chat_widget.config_ref())
.await
@@ -4958,10 +4999,10 @@ async fn interrupt_without_active_turn_is_treated_as_handled() {
.expect("primary thread should be registered");
let op = AppCommand::interrupt();
let handled = app
.try_submit_active_thread_op_via_app_server(&mut app_server, thread_id, &op)
.await
.expect("interrupt submission should not fail");
let handled =
Box::pin(app.try_submit_active_thread_op_via_app_server(&mut app_server, thread_id, &op))
.await
.expect("interrupt submission should not fail");
assert_eq!(handled, true);
}
+119
View File
@@ -0,0 +1,119 @@
//! TUI-owned approval request models used while rendering and queueing prompts.
//!
//! These structs normalize app-server request params into the shape the TUI
//! needs while an approval may be deferred behind streaming output. Exec
//! approvals keep app-server decision and permission types; patch approvals add
//! the file-change display model collected from nearby thread items.
use std::collections::HashMap;
use std::path::PathBuf;
use crate::diff_model::FileChange;
use codex_app_server_protocol::AdditionalPermissionProfile;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::ExecPolicyAmendment;
use codex_app_server_protocol::NetworkApprovalContext;
use codex_app_server_protocol::NetworkPolicyAmendment;
use codex_app_server_protocol::NetworkPolicyRuleAction;
use codex_utils_absolute_path::AbsolutePathBuf;
use serde::Deserialize;
use serde::Serialize;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ExecApprovalRequestEvent {
pub(crate) call_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) approval_id: Option<String>,
#[serde(default)]
pub(crate) turn_id: String,
pub(crate) command: Vec<String>,
pub(crate) cwd: AbsolutePathBuf,
pub(crate) reason: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) proposed_execpolicy_amendment: Option<ExecPolicyAmendment>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) proposed_network_policy_amendments: Option<Vec<NetworkPolicyAmendment>>,
#[serde(default)]
pub(crate) available_decisions: Option<Vec<CommandExecutionApprovalDecision>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) network_approval_context: Option<NetworkApprovalContext>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) additional_permissions: Option<AdditionalPermissionProfile>,
}
impl ExecApprovalRequestEvent {
pub(crate) fn effective_approval_id(&self) -> String {
self.approval_id
.clone()
.unwrap_or_else(|| self.call_id.clone())
}
pub(crate) fn effective_available_decisions(&self) -> Vec<CommandExecutionApprovalDecision> {
match &self.available_decisions {
Some(decisions) => decisions.clone(),
None => Self::default_available_decisions(
self.network_approval_context.as_ref(),
self.proposed_execpolicy_amendment.as_ref(),
self.proposed_network_policy_amendments.as_deref(),
self.additional_permissions.as_ref(),
),
}
}
pub(crate) fn default_available_decisions(
network_approval_context: Option<&NetworkApprovalContext>,
proposed_execpolicy_amendment: Option<&ExecPolicyAmendment>,
proposed_network_policy_amendments: Option<&[NetworkPolicyAmendment]>,
additional_permissions: Option<&AdditionalPermissionProfile>,
) -> Vec<CommandExecutionApprovalDecision> {
if network_approval_context.is_some() {
let mut decisions = vec![
CommandExecutionApprovalDecision::Accept,
CommandExecutionApprovalDecision::AcceptForSession,
];
if let Some(amendment) = proposed_network_policy_amendments.and_then(|amendments| {
amendments
.iter()
.find(|amendment| amendment.action == NetworkPolicyRuleAction::Allow)
}) {
decisions.push(
CommandExecutionApprovalDecision::ApplyNetworkPolicyAmendment {
network_policy_amendment: amendment.clone(),
},
);
}
decisions.push(CommandExecutionApprovalDecision::Cancel);
return decisions;
}
if additional_permissions.is_some() {
return vec![
CommandExecutionApprovalDecision::Accept,
CommandExecutionApprovalDecision::Cancel,
];
}
let mut decisions = vec![CommandExecutionApprovalDecision::Accept];
if let Some(prefix) = proposed_execpolicy_amendment {
decisions.push(
CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment {
execpolicy_amendment: prefix.clone(),
},
);
}
decisions.push(CommandExecutionApprovalDecision::Cancel);
decisions
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ApplyPatchApprovalRequestEvent {
pub(crate) call_id: String,
#[serde(default)]
pub(crate) turn_id: String,
pub(crate) changes: HashMap<PathBuf, FileChange>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) reason: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) grant_root: Option<PathBuf>,
}
@@ -0,0 +1,107 @@
//! User-message display models and helpers for the chat widget.
//!
//! The app-server preserves user input as structured chunks, while chat history
//! renders a single prompt row. This module owns that display projection and
//! the small compare key used to suppress duplicate rows for pending steers.
use std::path::PathBuf;
use codex_app_server_protocol::UserInput;
use codex_protocol::user_input::TextElement;
use super::ChatWidget;
use super::append_text_with_rebased_elements;
#[derive(Clone, Debug, PartialEq)]
pub(super) struct UserMessageDisplay {
pub(super) message: String,
pub(super) remote_image_urls: Vec<String>,
pub(super) local_images: Vec<PathBuf>,
pub(super) text_elements: Vec<TextElement>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(super) struct PendingSteerCompareKey {
pub(super) message: String,
pub(super) image_count: usize,
}
impl ChatWidget {
pub(super) fn user_message_display_from_parts(
message: String,
text_elements: Vec<TextElement>,
local_images: Vec<PathBuf>,
remote_image_urls: Vec<String>,
) -> UserMessageDisplay {
UserMessageDisplay {
message,
remote_image_urls,
local_images,
text_elements,
}
}
/// Build the compare key for a submitted pending steer without invoking the
/// expensive request-serialization path. Pending steers only need to match the
/// committed app-server `UserMessage` item emitted after input drains, which
/// preserves flattened text and total image count.
pub(super) fn pending_steer_compare_key_from_items(
items: &[UserInput],
) -> PendingSteerCompareKey {
let mut message = String::new();
let mut image_count = 0;
for item in items {
match item {
UserInput::Text { text, .. } => message.push_str(text),
UserInput::Image { .. } | UserInput::LocalImage { .. } => image_count += 1,
UserInput::Skill { .. } | UserInput::Mention { .. } => {}
}
}
PendingSteerCompareKey {
message,
image_count,
}
}
pub(super) fn user_message_display_from_inputs(items: &[UserInput]) -> UserMessageDisplay {
let mut message = String::new();
let mut remote_image_urls = Vec::new();
let mut local_images = Vec::new();
let mut text_elements = Vec::new();
for item in items {
match item {
UserInput::Text {
text,
text_elements: current_text_elements,
} => append_text_with_rebased_elements(
&mut message,
&mut text_elements,
text,
current_text_elements.iter().map(|element| {
let range = element.byte_range.clone();
TextElement::new(
range.clone().into(),
element
.placeholder()
.or_else(|| text.get(range.start..range.end))
.map(str::to_string),
)
}),
),
UserInput::Image { url } => remote_image_urls.push(url.clone()),
UserInput::LocalImage { path } => local_images.push(path.clone()),
UserInput::Skill { .. } | UserInput::Mention { .. } => {}
}
}
Self::user_message_display_from_parts(
message,
text_elements,
local_images,
remote_image_urls,
)
}
}
+21
View File
@@ -0,0 +1,21 @@
//! Minimal file-change model used by TUI diff rendering and approval previews.
use std::path::PathBuf;
use serde::Deserialize;
use serde::Serialize;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub(crate) enum FileChange {
Add {
content: String,
},
Delete {
content: String,
},
Update {
unified_diff: String,
move_path: Option<PathBuf>,
},
}
+314
View File
@@ -0,0 +1,314 @@
//! Resolve saved-session state needed before resuming or forking a thread.
//!
//! The app-server API owns normal thread lifecycle data. This module coordinates
//! the TUI-specific cwd prompt and falls back to local rollout metadata only
//! before the app server has resumed the selected thread.
use std::io;
use std::path::Path;
use std::path::PathBuf;
use crate::cwd_prompt;
use crate::cwd_prompt::CwdPromptAction;
use crate::cwd_prompt::CwdPromptOutcome;
use crate::cwd_prompt::CwdSelection;
use crate::legacy_core::config::Config;
use crate::tui::Tui;
use codex_protocol::ThreadId;
use codex_rollout::state_db::get_state_db;
use codex_utils_path as path_utils;
use serde::Deserialize;
use serde_json::Value;
use tokio::io::AsyncBufReadExt;
#[derive(Default)]
struct RolloutResumeState {
thread_id: Option<ThreadId>,
cwd: Option<PathBuf>,
model: Option<String>,
}
#[derive(Deserialize)]
struct SessionMetadata {
id: ThreadId,
cwd: PathBuf,
}
#[derive(Deserialize)]
struct TurnContextResumeState {
cwd: PathBuf,
model: String,
}
#[derive(Deserialize)]
struct RawRecord {
#[serde(rename = "type")]
item_type: String,
payload: Option<Value>,
}
pub(crate) enum ResolveCwdOutcome {
Continue(Option<PathBuf>),
Exit,
}
pub(crate) async fn resolve_session_thread_id(
path: &Path,
id_str_if_uuid: Option<&str>,
) -> Option<ThreadId> {
match id_str_if_uuid {
Some(id_str) => ThreadId::from_string(id_str).ok(),
None => read_rollout_resume_state(path)
.await
.ok()
.and_then(|state| state.thread_id),
}
}
pub(crate) async fn read_session_model(
config: &Config,
thread_id: ThreadId,
path: Option<&Path>,
) -> Option<String> {
if let Some(state_db_ctx) = get_state_db(config).await
&& let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await
&& let Some(model) = metadata.model
{
return Some(model);
}
let path = path?;
read_rollout_resume_state(path)
.await
.ok()
.and_then(|state| state.model)
}
pub(crate) async fn resolve_cwd_for_resume_or_fork(
tui: &mut Tui,
config: &Config,
current_cwd: &Path,
thread_id: ThreadId,
path: Option<&Path>,
action: CwdPromptAction,
allow_prompt: bool,
) -> color_eyre::Result<ResolveCwdOutcome> {
let Some(history_cwd) = read_session_cwd(config, thread_id, path).await else {
return Ok(ResolveCwdOutcome::Continue(None));
};
if allow_prompt && cwds_differ(current_cwd, &history_cwd) {
let selection_outcome =
cwd_prompt::run_cwd_selection_prompt(tui, action, current_cwd, &history_cwd).await?;
return Ok(match selection_outcome {
CwdPromptOutcome::Selection(CwdSelection::Current) => {
ResolveCwdOutcome::Continue(Some(current_cwd.to_path_buf()))
}
CwdPromptOutcome::Selection(CwdSelection::Session) => {
ResolveCwdOutcome::Continue(Some(history_cwd))
}
CwdPromptOutcome::Exit => ResolveCwdOutcome::Exit,
});
}
Ok(ResolveCwdOutcome::Continue(Some(history_cwd)))
}
async fn read_session_cwd(
config: &Config,
thread_id: ThreadId,
path: Option<&Path>,
) -> Option<PathBuf> {
if let Some(state_db_ctx) = get_state_db(config).await
&& let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await
{
return Some(metadata.cwd);
}
let path = path?;
match read_rollout_resume_state(path).await {
Ok(state) => state.cwd,
Err(err) => {
let rollout_path = path.display().to_string();
tracing::warn!(
%rollout_path,
%err,
"Failed to read session metadata from rollout"
);
None
}
}
}
pub(crate) fn cwds_differ(current_cwd: &Path, session_cwd: &Path) -> bool {
!path_utils::paths_match_after_normalization(current_cwd, session_cwd)
}
async fn read_rollout_resume_state(path: &Path) -> io::Result<RolloutResumeState> {
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut state = RolloutResumeState::default();
let mut saw_record = false;
while let Some(line) = lines.next_line().await? {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let Ok(record) = serde_json::from_str::<RawRecord>(trimmed) else {
continue;
};
saw_record = true;
let Some(payload) = record.payload else {
continue;
};
match record.item_type.as_str() {
"session_meta" if state.thread_id.is_none() => {
if let Ok(metadata) = serde_json::from_value::<SessionMetadata>(payload) {
state.thread_id = Some(metadata.id);
state.cwd.get_or_insert(metadata.cwd);
}
}
"turn_context" => {
if let Ok(turn_context) = serde_json::from_value::<TurnContextResumeState>(payload)
{
state.cwd = Some(turn_context.cwd);
state.model = Some(turn_context.model);
}
}
_ => {}
}
}
if saw_record {
Ok(state)
} else {
Err(io::Error::other(format!(
"rollout at {} is empty",
path.display()
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
fn rollout_line(
timestamp: &str,
item_type: &str,
payload: serde_json::Value,
) -> serde_json::Value {
serde_json::json!({
"timestamp": timestamp,
"type": item_type,
"payload": payload,
})
}
fn write_rollout_lines(path: &Path, lines: &[serde_json::Value]) -> std::io::Result<()> {
let mut text = String::new();
for line in lines {
text.push_str(&serde_json::to_string(line).expect("serialize rollout"));
text.push('\n');
}
std::fs::write(path, text)
}
#[tokio::test]
async fn rollout_resume_state_prefers_latest_turn_context() -> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let thread_id = ThreadId::new();
let original = temp_dir.path().join("original");
let latest = temp_dir.path().join("latest");
let rollout_path = temp_dir.path().join("rollout.jsonl");
write_rollout_lines(
&rollout_path,
&[
rollout_line(
"t0",
"session_meta",
serde_json::json!({
"id": thread_id,
"cwd": original,
"originator": "test",
"cli_version": "test",
}),
),
rollout_line(
"t1",
"turn_context",
serde_json::json!({ "cwd": temp_dir.path().join("middle"), "model": "middle" }),
),
rollout_line(
"t2",
"turn_context",
serde_json::json!({ "cwd": latest.clone(), "model": "latest" }),
),
],
)?;
let state = read_rollout_resume_state(&rollout_path).await?;
assert_eq!(state.thread_id, Some(thread_id));
assert_eq!(state.cwd, Some(latest));
assert_eq!(state.model, Some("latest".to_string()));
Ok(())
}
#[tokio::test]
async fn rollout_resume_state_falls_back_to_session_meta() -> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let thread_id = ThreadId::new();
let cwd = temp_dir.path().join("session");
let rollout_path = temp_dir.path().join("rollout.jsonl");
write_rollout_lines(
&rollout_path,
&[rollout_line(
"t0",
"session_meta",
serde_json::json!({
"id": thread_id,
"cwd": cwd.clone(),
"originator": "test",
"cli_version": "test",
}),
)],
)?;
let state = read_rollout_resume_state(&rollout_path).await?;
assert_eq!(state.thread_id, Some(thread_id));
assert_eq!(state.cwd, Some(cwd));
assert_eq!(state.model, None);
Ok(())
}
#[tokio::test]
async fn rollout_resume_state_skips_malformed_lines() -> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let thread_id = ThreadId::new();
let cwd = temp_dir.path().join("session");
let rollout_path = temp_dir.path().join("rollout.jsonl");
let valid_line = serde_json::to_string(&rollout_line(
"t0",
"session_meta",
serde_json::json!({
"id": thread_id,
"cwd": cwd.clone(),
"originator": "test",
"cli_version": "test",
}),
))
.expect("serialize rollout line");
std::fs::write(&rollout_path, format!("{valid_line}\n{{"))?;
let state = read_rollout_resume_state(&rollout_path).await?;
assert_eq!(state.thread_id, Some(thread_id));
assert_eq!(state.cwd, Some(cwd));
Ok(())
}
}
+45
View File
@@ -0,0 +1,45 @@
//! Canonical TUI session state shared across app-server routing, chat display, and status UI.
//!
//! The app-server API is the boundary for session lifecycle events. Once those responses enter
//! TUI, this module holds the small internal state shape used by app orchestration and widgets.
use std::path::PathBuf;
use codex_app_server_protocol::AskForApproval;
use codex_protocol::ThreadId;
use codex_protocol::models::ActivePermissionProfile;
use codex_protocol::models::PermissionProfile;
use codex_utils_absolute_path::AbsolutePathBuf;
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct SessionNetworkProxyRuntime {
pub(crate) http_addr: String,
pub(crate) socks_addr: String,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ThreadSessionState {
pub(crate) thread_id: ThreadId,
pub(crate) forked_from_id: Option<ThreadId>,
pub(crate) fork_parent_title: Option<String>,
pub(crate) thread_name: Option<String>,
pub(crate) model: String,
pub(crate) model_provider_id: String,
pub(crate) service_tier: Option<codex_protocol::config_types::ServiceTier>,
pub(crate) approval_policy: AskForApproval,
pub(crate) approvals_reviewer: codex_protocol::config_types::ApprovalsReviewer,
/// Canonical active permissions for this session. Legacy app-server
/// responses are converted to a profile at ingestion time using the
/// response cwd so cached sessions do not reinterpret cwd-bound grants.
pub(crate) permission_profile: PermissionProfile,
/// Named or implicit built-in profile that produced `permission_profile`,
/// when the server knows it.
pub(crate) active_permission_profile: Option<ActivePermissionProfile>,
pub(crate) cwd: AbsolutePathBuf,
pub(crate) instruction_source_paths: Vec<AbsolutePathBuf>,
pub(crate) reasoning_effort: Option<codex_protocol::openai_models::ReasoningEffort>,
pub(crate) history_log_id: u64,
pub(crate) history_entry_count: u64,
pub(crate) network_proxy: Option<SessionNetworkProxyRuntime>,
pub(crate) rollout_path: Option<PathBuf>,
}
+1
View File
@@ -37,6 +37,7 @@ async fn test_config(temp_home: &TempDir) -> Config {
.build()
.await
.expect("load config");
config.approvals_reviewer = ApprovalsReviewer::User;
config
.permissions
.set_permission_profile(PermissionProfile::workspace_write_with(
+87
View File
@@ -0,0 +1,87 @@
//! TUI token usage models and display formatting.
use std::fmt;
use codex_protocol::num_format::format_with_separators;
use serde::Deserialize;
use serde::Serialize;
const BASELINE_TOKENS: i64 = 12000;
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct TokenUsage {
pub input_tokens: i64,
pub cached_input_tokens: i64,
pub output_tokens: i64,
pub reasoning_output_tokens: i64,
pub total_tokens: i64,
}
impl TokenUsage {
pub fn is_zero(&self) -> bool {
self.total_tokens == 0
}
pub(crate) fn cached_input(&self) -> i64 {
self.cached_input_tokens.max(0)
}
pub(crate) fn non_cached_input(&self) -> i64 {
(self.input_tokens - self.cached_input()).max(0)
}
pub(crate) fn blended_total(&self) -> i64 {
(self.non_cached_input() + self.output_tokens.max(0)).max(0)
}
pub(crate) fn tokens_in_context_window(&self) -> i64 {
self.total_tokens
}
pub(crate) fn percent_of_context_window_remaining(&self, context_window: i64) -> i64 {
if context_window <= BASELINE_TOKENS {
return 0;
}
let effective_window = context_window - BASELINE_TOKENS;
let used = (self.tokens_in_context_window() - BASELINE_TOKENS).max(0);
let remaining = (effective_window - used).max(0);
((remaining as f64 / effective_window as f64) * 100.0)
.clamp(0.0, 100.0)
.round() as i64
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct TokenUsageInfo {
pub(crate) total_token_usage: TokenUsage,
pub(crate) last_token_usage: TokenUsage,
pub(crate) model_context_window: Option<i64>,
}
impl fmt::Display for TokenUsage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Token usage: total={} input={}{} output={}{}",
format_with_separators(self.blended_total()),
format_with_separators(self.non_cached_input()),
if self.cached_input() > 0 {
format!(
" (+ {} cached)",
format_with_separators(self.cached_input())
)
} else {
String::new()
},
format_with_separators(self.output_tokens),
if self.reasoning_output_tokens > 0 {
format!(
" (reasoning {})",
format_with_separators(self.reasoning_output_tokens)
)
} else {
String::new()
}
)
}
}