[codex] Move thread naming to app server (#21260)

## Why

Thread names are app-server metadata now, backed by the thread store and
sqlite state database. Keeping a core `SetThreadName` op plus a rollout
`thread_name_updated` event made rename persistence live in the wrong
layer and required historical replay support for an event that new
app-server flows should not write.

## What changed

- Removed `Op::SetThreadName` and `EventMsg::ThreadNameUpdated` from the
core protocol and deleted the core handler path that appended rename
events to rollouts.
- Updated app-server `thread/name/set` so both loaded and unloaded
threads write through thread-store metadata and app-server emits
`thread/name/updated` notifications.
- Updated local thread-store name metadata updates to write sqlite title
metadata and the legacy thread-name index without appending rollout
events.
- Removed state extraction and rollout handling for the deleted
thread-name event.

## Validation

- `cargo test -p codex-app-server thread_name_updated_broadcasts`
- `cargo test -p codex-app-server
thread_name_set_is_reflected_in_read_list_and_resume`
- `cargo test -p codex-thread-store
update_thread_metadata_sets_name_on_active_rollout_and_indexes_name`
- `cargo test -p codex-state`
- `cargo check -p codex-mcp-server -p codex-rollout-trace`
- `just fix -p codex-app-server -p codex-thread-store -p codex-state -p
codex-mcp-server -p codex-rollout-trace`

## Docs

No external documentation update is expected for this internal ownership
change.
This commit is contained in:
pakrym-oai
2026-05-05 17:16:06 -07:00
committed by GitHub
Unverified
parent 3ec18a2c0a
commit 2c1a361a2e
13 changed files with 32 additions and 215 deletions
@@ -53,7 +53,6 @@ use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::SkillsChangedNotification;
use codex_app_server_protocol::ThreadGoalUpdatedNotification;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadNameUpdatedNotification;
use codex_app_server_protocol::ThreadRealtimeClosedNotification;
use codex_app_server_protocol::ThreadRealtimeErrorNotification;
use codex_app_server_protocol::ThreadRealtimeItemAddedNotification;
@@ -1207,17 +1206,6 @@ pub(crate) async fn apply_bespoke_event_handling(
outgoing.send_response(request_id, response).await;
}
}
EventMsg::ThreadNameUpdated(thread_name_event) => {
let notification = ThreadNameUpdatedNotification {
thread_id: thread_name_event.thread_id.to_string(),
thread_name: thread_name_event.thread_name,
};
outgoing
.send_global_server_notification(ServerNotification::ThreadNameUpdated(
notification,
))
.await;
}
EventMsg::ThreadGoalUpdated(thread_goal_event) => {
let notification = ThreadGoalUpdatedNotification {
thread_id: thread_goal_event.thread_id.to_string(),
@@ -36,7 +36,7 @@ use codex_external_agent_sessions::prepare_validated_session_imports;
use codex_external_agent_sessions::record_imported_session;
use codex_protocol::ThreadId;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::Op;
use codex_thread_store::ThreadMetadataPatch;
use std::collections::HashSet;
use std::path::PathBuf;
use tokio::sync::Semaphore;
@@ -320,7 +320,13 @@ impl ExternalAgentConfigRequestProcessor {
{
imported_thread
.thread
.submit(Op::SetThreadName { name })
.update_thread_metadata(
ThreadMetadataPatch {
name: Some(name),
..Default::default()
},
/*include_archived*/ false,
)
.await
.map_err(|err| internal_error(format!("failed to name imported session: {err}")))?;
}
@@ -405,10 +405,7 @@ impl ThreadRequestProcessor {
request_id: ConnectionRequestId,
params: ThreadSetNameParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
match self
.thread_set_name_response_inner(&request_id, params)
.await
{
match self.thread_set_name_response_inner(params).await {
Ok((response, notification)) => {
self.outgoing
.send_response(request_id.clone(), response)
@@ -1335,7 +1332,6 @@ impl ThreadRequestProcessor {
async fn thread_set_name_response_inner(
&self,
request_id: &ConnectionRequestId,
params: ThreadSetNameParams,
) -> Result<(ThreadSetNameResponse, Option<ThreadNameUpdatedNotification>), JSONRPCErrorError>
{
@@ -1347,13 +1343,6 @@ impl ThreadRequestProcessor {
};
let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?;
if let Ok(thread) = self.thread_manager.get_thread(thread_id).await {
self.submit_core_op(request_id, thread.as_ref(), Op::SetThreadName { name })
.await
.map_err(|err| internal_error(format!("failed to set thread name: {err}")))?;
return Ok((ThreadSetNameResponse {}, None));
}
self.thread_store
.update_thread_metadata(StoreUpdateThreadMetadataParams {
thread_id,
@@ -22,11 +22,7 @@ use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadSetNameResponse;
use codex_core::find_thread_name_by_id;
use codex_core::find_thread_path_by_id_str;
use codex_protocol::ThreadId;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
@@ -85,10 +81,6 @@ async fn thread_name_updated_broadcasts_for_loaded_threads() -> Result<()> {
read_notification_for_method(&mut ws2, "thread/name/updated").await?;
assert_thread_name_updated(ws2_notification, &conversation_id, renamed)?;
assert_legacy_thread_name(codex_home.path(), &conversation_id, renamed).await?;
assert_eq!(
thread_name_update_rollout_count(codex_home.path(), &conversation_id).await?,
1
);
assert_no_message(&mut ws1, Duration::from_millis(250)).await?;
assert_no_message(&mut ws2, Duration::from_millis(250)).await?;
@@ -141,10 +133,6 @@ async fn thread_name_updated_broadcasts_for_not_loaded_threads() -> Result<()> {
read_notification_for_method(&mut ws2, "thread/name/updated").await?;
assert_thread_name_updated(ws2_notification, &conversation_id, renamed)?;
assert_legacy_thread_name(codex_home.path(), &conversation_id, renamed).await?;
assert_eq!(
thread_name_update_rollout_count(codex_home.path(), &conversation_id).await?,
1
);
assert_no_message(&mut ws1, Duration::from_millis(250)).await?;
assert_no_message(&mut ws2, Duration::from_millis(250)).await?;
@@ -206,24 +194,3 @@ async fn assert_legacy_thread_name(
);
Ok(())
}
async fn thread_name_update_rollout_count(
codex_home: &Path,
conversation_id: &str,
) -> Result<usize> {
let rollout_path =
find_thread_path_by_id_str(codex_home, conversation_id, /*state_db_ctx*/ None)
.await?
.context("rollout path")?;
let contents = tokio::fs::read_to_string(rollout_path).await?;
Ok(contents
.lines()
.filter_map(|line| serde_json::from_str::<RolloutLine>(line).ok())
.filter(|line| {
matches!(
line.item,
RolloutItem::EventMsg(EventMsg::ThreadNameUpdated(_))
)
})
.count())
}
-4
View File
@@ -266,10 +266,6 @@ async fn forward_events(
id: _,
msg: EventMsg::SessionConfigured(_),
} => {}
Event {
id: _,
msg: EventMsg::ThreadNameUpdated(_),
} => {}
Event {
id,
msg: EventMsg::ExecApprovalRequest(event),
-79
View File
@@ -52,7 +52,6 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SkillErrorInfo;
use codex_protocol::protocol::SkillsListEntry;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_protocol::protocol::ThreadNameUpdatedEvent;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::WarningEvent;
@@ -763,21 +762,6 @@ pub async fn thread_rollback(sess: &Arc<Session>, sub_id: String, num_turns: u32
.await;
}
async fn persist_thread_name_update(
sess: &Arc<Session>,
event: ThreadNameUpdatedEvent,
) -> anyhow::Result<EventMsg> {
let msg = EventMsg::ThreadNameUpdated(event);
let item = RolloutItem::EventMsg(msg.clone());
let live_thread = sess.live_thread_for_persistence("rename thread")?;
live_thread.persist().await?;
live_thread
.append_items(std::slice::from_ref(&item))
.await?;
live_thread.flush().await?;
Ok(msg)
}
pub(super) async fn persist_thread_memory_mode_update(
sess: &Arc<Session>,
mode: ThreadMemoryMode,
@@ -792,65 +776,6 @@ pub(super) async fn persist_thread_memory_mode_update(
Ok(())
}
/// Persists the thread name in the rollout and state database, updates in-memory state, and
/// emits a `ThreadNameUpdated` event on success.
pub async fn set_thread_name(sess: &Arc<Session>, sub_id: String, name: String) {
let Some(name) = crate::util::normalize_thread_name(&name) else {
let event = Event {
id: sub_id,
msg: EventMsg::Error(ErrorEvent {
message: "Thread name cannot be empty.".to_string(),
codex_error_info: Some(CodexErrorInfo::BadRequest),
}),
};
sess.send_event_raw(event).await;
return;
};
let updated = ThreadNameUpdatedEvent {
thread_id: sess.conversation_id,
thread_name: Some(name.clone()),
};
let msg = match persist_thread_name_update(sess, updated).await {
Ok(msg) => msg,
Err(err) => {
warn!("Failed to persist thread name update to rollout: {err}");
let event = Event {
id: sub_id,
msg: EventMsg::Error(ErrorEvent {
message: err.to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
}),
};
sess.send_event_raw(event).await;
return;
}
};
if let Some(state_db) = sess.services.state_db.as_deref()
&& let Err(err) = state_db
.update_thread_title(sess.conversation_id, &name)
.await
{
warn!("Failed to update thread title in state db: {err}");
}
{
let mut state = sess.state.lock().await;
state.session_configuration.thread_name = Some(name.clone());
}
let codex_home = sess.codex_home().await;
if let Err(err) =
crate::rollout::append_thread_name(&codex_home, sess.conversation_id, &name).await
{
warn!("Failed to update legacy thread name index: {err}");
}
sess.deliver_event_raw(Event { id: sub_id, msg }).await;
}
/// Persists thread-level memory mode metadata for the active session.
///
/// This does not involve the model and only affects whether the thread is
@@ -1119,10 +1044,6 @@ pub(super) async fn submission_loop(
thread_rollback(&sess, sub.id.clone(), num_turns).await;
false
}
Op::SetThreadName { name } => {
set_thread_name(&sess, sub.id.clone(), name).await;
false
}
Op::SetThreadMemoryMode { mode } => {
set_thread_memory_mode(&sess, sub.id.clone(), mode).await;
false
-1
View File
@@ -1476,7 +1476,6 @@ pub(super) fn realtime_text_for_event(msg: &EventMsg) -> Option<String> {
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::AgentReasoningSectionBreak(_)
| EventMsg::SessionConfigured(_)
| EventMsg::ThreadNameUpdated(_)
| EventMsg::ThreadGoalUpdated(_)
| EventMsg::McpStartupUpdate(_)
| EventMsg::McpStartupComplete(_)
@@ -318,9 +318,6 @@ async fn run_codex_tool_session_inner(
EventMsg::SessionConfigured(_) => {
tracing::error!("unexpected SessionConfigured event");
}
EventMsg::ThreadNameUpdated(_) => {
// Ignore session metadata updates in MCP tool runner.
}
EventMsg::ThreadGoalUpdated(_) => {
// Ignore thread goal metadata updates in MCP tool runner.
}
-17
View File
@@ -766,11 +766,6 @@ pub enum Op {
/// to generate a summary which will be returned as an AgentMessage event.
Compact,
/// Set a user-facing thread name in the persisted rollout metadata.
/// This is a local-only operation handled by codex-core; it does not
/// involve the model.
SetThreadName { name: String },
/// Set whether the thread remains eligible for memory generation.
///
/// This persists thread-level memory mode metadata without involving the
@@ -903,7 +898,6 @@ impl Op {
Self::ReloadUserConfig => "reload_user_config",
Self::ListSkills { .. } => "list_skills",
Self::Compact => "compact",
Self::SetThreadName { .. } => "set_thread_name",
Self::SetThreadMemoryMode { .. } => "set_thread_memory_mode",
Self::ThreadRollback { .. } => "thread_rollback",
Self::Review { .. } => "review",
@@ -1373,9 +1367,6 @@ pub enum EventMsg {
/// Ack the client's configure message.
SessionConfigured(SessionConfiguredEvent),
/// Updated session metadata (e.g., thread name changes).
ThreadNameUpdated(ThreadNameUpdatedEvent),
/// Updated long-running goal metadata for the thread.
ThreadGoalUpdated(ThreadGoalUpdatedEvent),
@@ -3592,14 +3583,6 @@ impl<'de> Deserialize<'de> for SessionConfiguredEvent {
}
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct ThreadNameUpdatedEvent {
pub thread_id: ThreadId,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub thread_name: Option<String>,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "protocol/")]
@@ -236,7 +236,6 @@ pub(crate) fn tool_runtime_trace_event(event: &EventMsg) -> Option<ToolRuntimeTr
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::AgentReasoningSectionBreak(_)
| EventMsg::SessionConfigured(_)
| EventMsg::ThreadNameUpdated(_)
| EventMsg::McpStartupUpdate(_)
| EventMsg::McpStartupComplete(_)
| EventMsg::WebSearchBegin(_)
@@ -290,7 +289,6 @@ pub(crate) fn wrapped_protocol_event_type(event: &EventMsg) -> Option<&'static s
EventMsg::TurnStarted(_) => Some("turn_started"),
EventMsg::TurnComplete(_) => Some("turn_complete"),
EventMsg::TurnAborted(_) => Some("turn_aborted"),
EventMsg::ThreadNameUpdated(_) => Some("thread_name_updated"),
EventMsg::ThreadRolledBack(_) => Some("thread_rolled_back"),
EventMsg::Error(_) => Some("error"),
EventMsg::Warning(_) => Some("warning"),
-1
View File
@@ -98,7 +98,6 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::TokenCount(_)
| EventMsg::ThreadNameUpdated(_)
| EventMsg::ContextCompacted(_)
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_)
+1 -30
View File
@@ -33,9 +33,7 @@ pub fn apply_rollout_item(
pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool {
match item {
RolloutItem::SessionMeta(_) | RolloutItem::TurnContext(_) => true,
RolloutItem::EventMsg(
EventMsg::TokenCount(_) | EventMsg::UserMessage(_) | EventMsg::ThreadNameUpdated(_),
) => true,
RolloutItem::EventMsg(EventMsg::TokenCount(_) | EventMsg::UserMessage(_)) => true,
RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) => {
false
}
@@ -97,13 +95,6 @@ fn apply_event_msg(metadata: &mut ThreadMetadata, event: &EventMsg) {
}
}
}
EventMsg::ThreadNameUpdated(updated) => {
if let Some(title) = updated.thread_name.as_deref()
&& !title.trim().is_empty()
{
metadata.title = title.trim().to_string();
}
}
_ => {}
}
}
@@ -159,7 +150,6 @@ mod tests {
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadNameUpdatedEvent;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::protocol::UserMessageEvent;
@@ -205,25 +195,6 @@ mod tests {
assert_eq!(metadata.title, "actual user request");
}
#[test]
fn thread_name_update_replaces_title_without_changing_first_user_message() {
let mut metadata = metadata_for_test();
metadata.title = "actual user request".to_string();
metadata.first_user_message = Some("actual user request".to_string());
let item = RolloutItem::EventMsg(EventMsg::ThreadNameUpdated(ThreadNameUpdatedEvent {
thread_id: metadata.id,
thread_name: Some("saved-session".to_string()),
}));
apply_rollout_item(&mut metadata, &item, "test-provider");
assert_eq!(
metadata.first_user_message.as_deref(),
Some("actual user request")
);
assert_eq!(metadata.title, "saved-session");
}
#[test]
fn event_msg_image_only_user_message_sets_image_placeholder_preview() {
let mut metadata = metadata_for_test();
@@ -2,11 +2,9 @@ use std::path::Path;
use std::path::PathBuf;
use codex_protocol::ThreadId;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::GitInfo;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_protocol::protocol::ThreadNameUpdatedEvent;
use codex_rollout::ARCHIVED_SESSIONS_SUBDIR;
use codex_rollout::append_rollout_item_to_path;
use codex_rollout::append_thread_name;
@@ -50,10 +48,8 @@ pub(super) async fn update_thread_metadata(
}
let resolved_rollout_path =
resolve_rollout_path(store, thread_id, params.include_archived).await?;
let name = params.patch.name;
let git_info = params.patch.git_info;
if let Some(name) = params.patch.name {
apply_thread_name(store, resolved_rollout_path.path.as_path(), thread_id, name).await?;
}
if let Some(memory_mode) = params.patch.memory_mode {
apply_thread_memory_mode(resolved_rollout_path.path.as_path(), thread_id, memory_mode)
.await?;
@@ -70,6 +66,10 @@ pub(super) async fn update_thread_metadata(
)
.await;
if let Some(name) = name {
apply_thread_name(store, thread_id, name).await?;
}
let resolved_git_info = match git_info {
Some(git_info) => {
let state_db = store.state_db();
@@ -229,20 +229,22 @@ async fn apply_thread_git_info_to_rollout(
async fn apply_thread_name(
store: &LocalThreadStore,
rollout_path: &Path,
thread_id: ThreadId,
name: String,
) -> ThreadStoreResult<()> {
let item = RolloutItem::EventMsg(EventMsg::ThreadNameUpdated(ThreadNameUpdatedEvent {
thread_id,
thread_name: Some(name.clone()),
}));
append_rollout_item_to_path(rollout_path, &item)
let updated = store
.state_db()
.update_thread_title(thread_id, &name)
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to set thread name: {err}"),
})?;
if !updated {
return Err(ThreadStoreError::Internal {
message: format!("thread metadata unavailable before name update: {thread_id}"),
});
}
append_thread_name(store.config.codex_home.as_path(), thread_id, &name)
.await
.map_err(|err| ThreadStoreError::Internal {
@@ -369,8 +371,7 @@ mod tests {
let store = test_store(home.path()).await;
let uuid = Uuid::from_u128(301);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let path =
write_session_file(home.path(), "2025-01-03T14-00-00", uuid).expect("session file");
write_session_file(home.path(), "2025-01-03T14-00-00", uuid).expect("session file");
let thread = store
.update_thread_metadata(UpdateThreadMetadataParams {
@@ -390,11 +391,13 @@ mod tests {
.expect("find thread name");
assert_eq!(latest_name.as_deref(), Some("A sharper name"));
let appended = last_rollout_item(path.as_path());
assert_eq!(appended["type"], "event_msg");
assert_eq!(appended["payload"]["type"], "thread_name_updated");
assert_eq!(appended["payload"]["thread_id"], thread_id.to_string());
assert_eq!(appended["payload"]["thread_name"], "A sharper name");
let metadata = store
.state_db()
.get_thread(thread_id)
.await
.expect("get metadata")
.expect("metadata");
assert_eq!(metadata.title, "A sharper name");
}
#[tokio::test]