From fac3158c2a783095768076489815f361fa9b0db4 Mon Sep 17 00:00:00 2001 From: Jeremy Rose <172423086+nornagon-openai@users.noreply.github.com> Date: Tue, 16 Jun 2026 17:06:22 -0700 Subject: [PATCH] Add thread recencyAt for sidebar ordering (#27910) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Add a server-owned `recencyAt` timestamp and `recency_at` thread-list sort key for product recency ordering while preserving the existing meaning of `updatedAt` as the latest persisted thread mutation. This is the server-side alternative to #27697. Rather than narrowing `updatedAt`, clients can sort the sidebar by `recency_at` and continue treating `updatedAt` as mutation time. Paired Codex Apps PR: [openai/openai#1024599](https://github.com/openai/openai/pull/1024599) ## Contract - `recencyAt` initializes when a thread is created. - A turn start advances `recencyAt` monotonically. - Commentary, agent output, tool results, token/accounting updates, turn completion, archive, unarchive, resume, and generic metadata writes do not advance it. - `updatedAt` retains its existing behavior and continues to advance for persisted thread mutations. - Current servers populate `recencyAt`; the response field is optional in generated TypeScript so clients connected to older servers can fall back to `updatedAt`. - Filesystem-only fallback uses existing updated/mtime ordering when SQLite is unavailable. ## Persistence and compatibility Migration 0038 adds second- and millisecond-precision recency columns, backfills them from the existing updated timestamp, creates list indexes, and includes an insert trigger so older binaries writing to a migrated database seed recency without causing later mutations to advance it. Generic metadata upserts preserve existing recency values. Turn-start updates use a dedicated monotonic touch, and process-local allocation keeps millisecond cursor values unique. State DB list, search, read, filtered-list repair, rollout fallback propagation, and app-server conversions all carry the new field. ## API `Thread` responses include: ```ts recencyAt?: number ``` `thread/list` and `thread/search` accept: ```json { "sortKey": "recency_at" } ``` Generated TypeScript and JSON schemas are included. ## Validation - `just test -p codex-state` — 146 passed - `just test -p codex-rollout` — 69 passed - `just test -p codex-thread-store` — 81 passed - `just test -p codex-app-server-protocol` — 231 passed - Focused app-server list ordering, response mapping, archive/unarchive, and resume lifecycle tests passed - Scoped `just fix` for state, rollout, thread-store, app-server-protocol, and app-server - `just fmt` - `git diff --check` - Independent correctness, simplicity, elegance, security, and test-quality reviews; actionable ordering, lifecycle, query-projection, and timestamp-uniqueness findings were addressed --- .../analytics/src/analytics_client_tests.rs | 1 + codex-rs/analytics/src/client_tests.rs | 1 + .../schema/json/ClientRequest.json | 3 +- .../schema/json/ServerNotification.json | 8 + .../codex_app_server_protocol.schemas.json | 11 +- .../codex_app_server_protocol.v2.schemas.json | 11 +- .../schema/json/v2/ThreadForkResponse.json | 8 + .../schema/json/v2/ThreadListParams.json | 3 +- .../schema/json/v2/ThreadListResponse.json | 8 + .../json/v2/ThreadMetadataUpdateResponse.json | 8 + .../schema/json/v2/ThreadReadResponse.json | 8 + .../schema/json/v2/ThreadResumeResponse.json | 8 + .../json/v2/ThreadRollbackResponse.json | 8 + .../schema/json/v2/ThreadStartResponse.json | 8 + .../json/v2/ThreadStartedNotification.json | 8 + .../json/v2/ThreadUnarchiveResponse.json | 8 + .../schema/typescript/v2/Thread.ts | 4 + .../schema/typescript/v2/ThreadSortKey.ts | 2 +- .../src/protocol/common.rs | 2 + .../src/protocol/v2/tests.rs | 10 + .../src/protocol/v2/thread.rs | 1 + .../src/protocol/v2/thread_data.rs | 3 + codex-rs/app-server/README.md | 7 +- .../app-server/src/bespoke_event_handling.rs | 1 + .../request_processors/thread_processor.rs | 5 + .../thread_processor_tests.rs | 1 + .../thread_resume_redaction.rs | 1 + .../src/request_processors/thread_summary.rs | 1 + codex-rs/app-server/src/thread_status.rs | 1 + .../app-server/tests/suite/v2/thread_list.rs | 83 +++++ .../tests/suite/v2/thread_resume.rs | 26 +- codex-rs/core/src/realtime_context_tests.rs | 4 + codex-rs/exec/src/lib_tests.rs | 2 + codex-rs/rollout/src/list.rs | 71 ++-- codex-rs/rollout/src/metadata.rs | 1 + codex-rs/rollout/src/metadata_tests.rs | 1 + codex-rs/rollout/src/recorder.rs | 67 +++- codex-rs/rollout/src/recorder_tests.rs | 3 + codex-rs/rollout/src/state_db.rs | 7 +- codex-rs/rollout/src/state_db_tests.rs | 16 + codex-rs/rollout/src/tests.rs | 11 + .../migrations/0038_threads_recency_at.sql | 28 ++ codex-rs/state/src/extract.rs | 1 + codex-rs/state/src/migrations.rs | 4 + codex-rs/state/src/migrations_tests.rs | 129 +++++++ codex-rs/state/src/model/thread_metadata.rs | 26 +- codex-rs/state/src/runtime.rs | 44 ++- codex-rs/state/src/runtime/memories.rs | 2 + codex-rs/state/src/runtime/test_support.rs | 1 + codex-rs/state/src/runtime/threads.rs | 345 ++++++++++++++++-- codex-rs/thread-store/src/in_memory.rs | 3 + .../thread-store/src/local/archive_thread.rs | 1 + codex-rs/thread-store/src/local/helpers.rs | 2 + .../thread-store/src/local/list_threads.rs | 1 + codex-rs/thread-store/src/local/mod.rs | 91 +++++ .../thread-store/src/local/read_thread.rs | 9 + .../thread-store/src/local/search_threads.rs | 16 +- .../src/local/search_threads_tests.rs | 29 ++ .../src/local/unarchive_thread.rs | 1 + .../src/local/update_thread_metadata.rs | 18 + .../thread-store/src/thread_metadata_sync.rs | 31 +- codex-rs/thread-store/src/types.rs | 10 + codex-rs/tui/src/app/loaded_threads.rs | 1 + codex-rs/tui/src/app/tests.rs | 4 + codex-rs/tui/src/app/thread_session_state.rs | 1 + codex-rs/tui/src/app_server_session.rs | 1 + codex-rs/tui/src/resume_picker.rs | 12 +- 67 files changed, 1153 insertions(+), 99 deletions(-) create mode 100644 codex-rs/state/migrations/0038_threads_recency_at.sql create mode 100644 codex-rs/state/src/migrations_tests.rs create mode 100644 codex-rs/thread-store/src/local/search_threads_tests.rs diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 7c634ce37..b18b253b4 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -178,6 +178,7 @@ fn sample_thread_with_metadata( model_provider: "openai".to_string(), created_at: 1, updated_at: 2, + recency_at: Some(2), status: AppServerThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp").abs(), diff --git a/codex-rs/analytics/src/client_tests.rs b/codex-rs/analytics/src/client_tests.rs index 1835932e8..18bb8c215 100644 --- a/codex-rs/analytics/src/client_tests.rs +++ b/codex-rs/analytics/src/client_tests.rs @@ -285,6 +285,7 @@ fn sample_thread(thread_id: &str) -> Thread { model_provider: "openai".to_string(), created_at: 1, updated_at: 2, + recency_at: Some(2), status: AppServerThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp").abs(), diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 3358ea9d6..e7ead5395 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -4158,7 +4158,8 @@ "ThreadSortKey": { "enum": [ "created_at", - "updated_at" + "updated_at", + "recency_at" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index d64befa0a..a0d49cc54 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -3582,6 +3582,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 1e11c76ae..bbf68eb31 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -16621,6 +16621,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" @@ -18880,7 +18888,8 @@ "ThreadSortKey": { "enum": [ "created_at", - "updated_at" + "updated_at", + "recency_at" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 326bfdfc9..238be0011 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -14429,6 +14429,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" @@ -16688,7 +16696,8 @@ "ThreadSortKey": { "enum": [ "created_at", - "updated_at" + "updated_at", + "recency_at" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json index 4a666672b..2b7caeca7 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json @@ -1055,6 +1055,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadListParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadListParams.json index 789d9b61f..11f7da47d 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadListParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadListParams.json @@ -24,7 +24,8 @@ "ThreadSortKey": { "enum": [ "created_at", - "updated_at" + "updated_at", + "recency_at" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json index c1d1b6759..0caa0daf5 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json @@ -870,6 +870,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json index 801d1cf13..a98d3586e 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json @@ -870,6 +870,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json index 01b5ed907..51e56f14b 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json @@ -870,6 +870,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json index 032d1dd74..e8c309507 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json @@ -1055,6 +1055,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json index c1ad53475..52cba55ed 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json @@ -870,6 +870,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json index 33373b9f7..245cc946d 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json @@ -1055,6 +1055,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json index 577df57e0..bf497ecfa 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json @@ -870,6 +870,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json index fcc42e626..14f25cfcd 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json @@ -870,6 +870,14 @@ "description": "Usually the first user message in the thread, if available.", "type": "string" }, + "recencyAt": { + "description": "Unix timestamp (in seconds) used for thread recency ordering.", + "format": "int64", + "type": [ + "integer", + "null" + ] + }, "sessionId": { "description": "Session id shared by threads that belong to the same session tree.", "type": "string" diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/Thread.ts b/codex-rs/app-server-protocol/schema/typescript/v2/Thread.ts index 5fa30b64e..1c288ccbd 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/Thread.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/Thread.ts @@ -41,6 +41,10 @@ createdAt: number, * Unix timestamp (in seconds) when the thread was last updated. */ updatedAt: number, +/** + * Unix timestamp (in seconds) used for thread recency ordering. + */ +recencyAt: number | null, /** * Current runtime status for the thread. */ diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadSortKey.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadSortKey.ts index dbf1b6c40..d93f1c47b 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadSortKey.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadSortKey.ts @@ -2,4 +2,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export type ThreadSortKey = "created_at" | "updated_at"; +export type ThreadSortKey = "created_at" | "updated_at" | "recency_at"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 7611c27ff..3e9c0aade 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -2501,6 +2501,7 @@ mod tests { model_provider: "openai".to_string(), created_at: 1, updated_at: 2, + recency_at: Some(3), status: v2::ThreadStatus::Idle, path: None, cwd: cwd.clone(), @@ -2544,6 +2545,7 @@ mod tests { "modelProvider": "openai", "createdAt": 1, "updatedAt": 2, + "recencyAt": 3, "status": { "type": "idle" }, diff --git a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs index de138d5c2..39bc52717 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs @@ -174,6 +174,7 @@ fn thread_resume_response_round_trips_initial_turns_page() { model_provider: "openai".to_string(), created_at: 1, updated_at: 1, + recency_at: Some(1), status: ThreadStatus::Idle, path: None, cwd: absolute_path("tmp"), @@ -3600,6 +3601,7 @@ fn thread_lifecycle_responses_default_missing_optional_fields() { assert_eq!(start.instruction_sources, Vec::::new()); assert_eq!(start.thread.parent_thread_id, None); + assert_eq!(start.thread.recency_at, None); assert_eq!(resume.instruction_sources, Vec::::new()); assert_eq!(fork.instruction_sources, Vec::::new()); assert_eq!(start.active_permission_profile, None); @@ -3608,6 +3610,14 @@ fn thread_lifecycle_responses_default_missing_optional_fields() { assert_eq!(fork.active_permission_profile, None); } +#[test] +fn thread_recency_sort_key_serializes_as_snake_case() { + assert_eq!( + serde_json::to_value(ThreadSortKey::RecencyAt).expect("sort key should serialize"), + json!("recency_at") + ); +} + #[test] fn turn_start_params_preserve_explicit_null_service_tier() { let params: TurnStartParams = serde_json::from_value(json!({ diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index 0252c4b04..b5809f09b 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -1086,6 +1086,7 @@ pub enum ThreadSourceKind { pub enum ThreadSortKey { CreatedAt, UpdatedAt, + RecencyAt, } #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs index fc0762361..52a8d7017 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs @@ -152,6 +152,9 @@ pub struct Thread { /// Unix timestamp (in seconds) when the thread was last updated. #[ts(type = "number")] pub updated_at: i64, + /// Unix timestamp (in seconds) used for thread recency ordering. + #[ts(type = "number | null")] + pub recency_at: Option, /// Current runtime status for the thread. pub status: ThreadStatus, /// [UNSTABLE] Path to the thread on disk. diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 336f769d7..8d470b828 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -364,7 +364,8 @@ Like `thread/resume`, experimental clients can pass `excludeTurns: true` to `thr - `cursor` — opaque string from a prior response; omit for the first page. - `limit` — server defaults to a reasonable page size if unset. -- `sortKey` — `created_at` (default) or `updated_at`. +- `sortKey` — `created_at` (default), `updated_at`, or `recency_at`. +- `recencyAt` is initialized when the thread is created and advances when a turn starts. Unlike `updatedAt`, background output and other persisted mutations do not advance it. - `sortDirection` — `desc` (default) or `asc`. - `modelProviders` — restrict results to specific providers; unset, null, or an empty array will include all providers. - `sourceKinds` — restrict results to specific sources; omit or pass `[]` for interactive sessions only (`cli`, `vscode`). @@ -386,8 +387,8 @@ Example: } } { "id": 20, "result": { "data": [ - { "id": "thr_a", "preview": "Create a TUI", "modelProvider": "openai", "createdAt": 1730831111, "updatedAt": 1730831111, "status": { "type": "notLoaded" }, "agentNickname": "Atlas", "agentRole": "explorer" }, - { "id": "thr_b", "preview": "Fix tests", "modelProvider": "openai", "createdAt": 1730750000, "updatedAt": 1730750000, "status": { "type": "notLoaded" } } + { "id": "thr_a", "preview": "Create a TUI", "modelProvider": "openai", "createdAt": 1730831111, "updatedAt": 1730831111, "recencyAt": 1730831111, "status": { "type": "notLoaded" }, "agentNickname": "Atlas", "agentRole": "explorer" }, + { "id": "thr_b", "preview": "Fix tests", "modelProvider": "openai", "createdAt": 1730750000, "updatedAt": 1730750000, "recencyAt": 1730750000, "status": { "type": "notLoaded" } } ], "nextCursor": "opaque-token-or-null", "backwardsCursor": "opaque-token-or-null" diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 868cef81f..f71401e95 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -2259,6 +2259,7 @@ mod tests { reasoning_effort: None, created_at, updated_at: created_at, + recency_at: created_at, archived_at: None, cwd: test_path_buf("/tmp").abs().into(), cli_version: "0.0.0".to_string(), diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index a5299f2c7..907b05a03 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -1897,6 +1897,7 @@ impl ThreadRequestProcessor { let store_sort_key = match sort_key.unwrap_or(ThreadSortKey::CreatedAt) { ThreadSortKey::CreatedAt => StoreThreadSortKey::CreatedAt, ThreadSortKey::UpdatedAt => StoreThreadSortKey::UpdatedAt, + ThreadSortKey::RecencyAt => StoreThreadSortKey::RecencyAt, }; let sort_direction = sort_direction.unwrap_or(SortDirection::Desc); let (stored_threads, next_cursor) = self @@ -1978,6 +1979,7 @@ impl ThreadRequestProcessor { let store_sort_key = match sort_key.unwrap_or(ThreadSortKey::CreatedAt) { ThreadSortKey::CreatedAt => StoreThreadSortKey::CreatedAt, ThreadSortKey::UpdatedAt => StoreThreadSortKey::UpdatedAt, + ThreadSortKey::RecencyAt => StoreThreadSortKey::RecencyAt, }; let store_sort_direction = sort_direction.unwrap_or(SortDirection::Desc); let (allowed_sources, source_kind_filter) = compute_source_filters(source_kinds); @@ -3695,6 +3697,7 @@ fn thread_backwards_cursor_for_sort_key( let timestamp = match sort_key { StoreThreadSortKey::CreatedAt => thread.created_at, StoreThreadSortKey::UpdatedAt => thread.updated_at, + StoreThreadSortKey::RecencyAt => thread.recency_at, }; // The state DB stores unique millisecond timestamps. Offset the reverse cursor by one // millisecond so the opposite-direction query includes the page anchor. @@ -4139,6 +4142,7 @@ pub(crate) fn thread_from_stored_thread( }, created_at: thread.created_at.timestamp(), updated_at: thread.updated_at.timestamp(), + recency_at: Some(thread.recency_at.timestamp()), status: ThreadStatus::NotLoaded, path, cwd, @@ -4344,6 +4348,7 @@ fn build_thread_from_snapshot( model_provider: config_snapshot.model_provider_id.clone(), created_at: now, updated_at: now, + recency_at: Some(now), status: ThreadStatus::NotLoaded, path, cwd: config_snapshot.cwd().clone(), diff --git a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs index 42711e857..1f333f1eb 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs @@ -478,6 +478,7 @@ mod thread_processor_behavior_tests { reasoning_effort: None, created_at: created_at.with_timezone(&Utc), updated_at: updated_at.with_timezone(&Utc), + recency_at: updated_at.with_timezone(&Utc), archived_at: None, cwd: PathBuf::from("/tmp"), cli_version: "0.0.0".to_string(), diff --git a/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs b/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs index e970a88a9..110d6d5cb 100644 --- a/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs +++ b/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs @@ -178,6 +178,7 @@ mod tests { model_provider: "mock_provider".to_string(), created_at: 0, updated_at: 0, + recency_at: Some(0), status: ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp").abs(), diff --git a/codex-rs/app-server/src/request_processors/thread_summary.rs b/codex-rs/app-server/src/request_processors/thread_summary.rs index 0fb132005..67d2d9eab 100644 --- a/codex-rs/app-server/src/request_processors/thread_summary.rs +++ b/codex-rs/app-server/src/request_processors/thread_summary.rs @@ -320,6 +320,7 @@ pub(crate) fn summary_to_thread( model_provider, created_at: created_at.map(|dt| dt.timestamp()).unwrap_or(0), updated_at: updated_at.map(|dt| dt.timestamp()).unwrap_or(0), + recency_at: updated_at.map(|dt| dt.timestamp()), status: ThreadStatus::NotLoaded, path: (!path.as_os_str().is_empty()).then_some(path), cwd, diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index 6d66a32d6..a5997a58f 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -897,6 +897,7 @@ mod tests { model_provider: "mock-provider".to_string(), created_at: 0, updated_at: 0, + recency_at: Some(0), status: ThreadStatus::NotLoaded, path: None, cwd: test_path_buf("/tmp").abs(), diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 3f6adc167..9bdff3bb8 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -1689,6 +1689,89 @@ async fn thread_list_sort_updated_at_orders_by_mtime() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_list_sort_recency_at_uses_state_db_order_with_provider_filter() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + let id_old = create_fake_rollout( + codex_home.path(), + "2025-01-01T10-00-00", + "2025-01-01T10:00:00Z", + "Hello", + Some("mock_provider"), + /*git_info*/ None, + )?; + let id_new = create_fake_rollout( + codex_home.path(), + "2025-01-01T11-00-00", + "2025-01-01T11:00:00Z", + "Hello", + Some("mock_provider"), + /*git_info*/ None, + )?; + set_rollout_mtime( + rollout_path(codex_home.path(), "2025-01-01T10-00-00", &id_old).as_path(), + "2025-01-03T00:00:00Z", + )?; + + let state_db = + codex_state::StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()) + .await?; + state_db + .mark_backfill_complete(/*last_watermark*/ None) + .await?; + let rollout_config = codex_rollout::RolloutConfig { + codex_home: codex_home.path().to_path_buf(), + sqlite_home: codex_home.path().to_path_buf(), + cwd: codex_home.path().to_path_buf(), + model_provider_id: "mock_provider".to_string(), + generate_memories: false, + }; + codex_core::RolloutRecorder::list_threads( + Some(state_db.clone()), + &rollout_config, + /*page_size*/ 10, + /*cursor*/ None, + codex_core::ThreadSortKey::CreatedAt, + codex_core::SortDirection::Desc, + codex_core::INTERACTIVE_SESSION_SOURCES.as_slice(), + /*model_providers*/ None, + /*cwd_filters*/ None, + "mock_provider", + /*search_term*/ None, + ) + .await?; + state_db + .touch_thread_recency_at( + ThreadId::from_string(&id_new)?, + DateTime::::from_timestamp(1_800_000_000, 0).expect("timestamp"), + ) + .await?; + + let mut mcp = init_mcp(codex_home.path()).await?; + let ThreadListResponse { data, .. } = list_threads_with_sort( + &mut mcp, + /*cursor*/ None, + Some(10), + Some(vec!["mock_provider".to_string()]), + /*source_kinds*/ None, + Some(ThreadSortKey::RecencyAt), + /*archived*/ None, + ) + .await?; + + assert_eq!( + data.iter() + .map(|thread| thread.id.as_str()) + .collect::>(), + vec![id_new.as_str(), id_old.as_str()] + ); + assert!(data.iter().all(|thread| thread.recency_at.is_some())); + + Ok(()) +} + #[tokio::test] async fn thread_list_updated_at_paginates_with_cursor() -> Result<()> { let codex_home = TempDir::new()?; diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index e67184bf0..03e30e373 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -2200,6 +2200,7 @@ async fn thread_resume_defers_updated_at_until_turn_start() -> Result<()> { let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; assert_eq!(thread.updated_at, before_resume.updated_at); + assert_eq!(thread.recency_at, before_resume.recency_at); assert_eq!(thread.status, ThreadStatus::Idle); let after_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?; @@ -2234,7 +2235,7 @@ async fn thread_resume_defers_updated_at_until_turn_start() -> Result<()> { let turn_id = mcp .send_turn_start_request(TurnStartParams { - thread_id, + thread_id: thread_id.clone(), input: vec![UserInput::Text { text: "Hello".to_string(), text_elements: Vec::new(), @@ -2247,6 +2248,29 @@ async fn thread_resume_defers_updated_at_until_turn_start() -> Result<()> { mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), ) .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/started"), + ) + .await??; + + let read_id = mcp + .send_thread_read_request(ThreadReadParams { + thread_id: thread_id.clone(), + include_turns: false, + }) + .await?; + let read_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(read_id)), + ) + .await??; + let ThreadReadResponse { + thread: after_turn_start, + .. + } = to_response::(read_resp)?; + assert!(after_turn_start.recency_at > before_resume.recency_at); + timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_notification_message("turn/completed"), diff --git a/codex-rs/core/src/realtime_context_tests.rs b/codex-rs/core/src/realtime_context_tests.rs index f04282a65..352d6ae8e 100644 --- a/codex-rs/core/src/realtime_context_tests.rs +++ b/codex-rs/core/src/realtime_context_tests.rs @@ -47,6 +47,10 @@ fn stored_thread(cwd: &str, title: &str, first_user_message: &str) -> StoredThre .timestamp_opt(1_709_251_200, 0) .single() .expect("valid timestamp"), + recency_at: Utc + .timestamp_opt(1_709_251_200, 0) + .single() + .expect("valid timestamp"), archived_at: None, cwd: PathBuf::from(cwd), cli_version: "test".to_string(), diff --git a/codex-rs/exec/src/lib_tests.rs b/codex-rs/exec/src/lib_tests.rs index 6b27d0111..526c5fc48 100644 --- a/codex-rs/exec/src/lib_tests.rs +++ b/codex-rs/exec/src/lib_tests.rs @@ -342,6 +342,7 @@ fn turn_items_for_thread_returns_matching_turn_items() { model_provider: "openai".to_string(), created_at: 0, updated_at: 0, + recency_at: Some(0), status: codex_app_server_protocol::ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp/project").abs(), @@ -760,6 +761,7 @@ fn sample_thread_start_response() -> ThreadStartResponse { model_provider: "openai".to_string(), created_at: 0, updated_at: 0, + recency_at: Some(0), status: codex_app_server_protocol::ThreadStatus::Idle, path: Some(PathBuf::from("/tmp/rollout.jsonl")), cwd: test_path_buf("/tmp").abs(), diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index 845145aae..f5013e7aa 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -77,6 +77,8 @@ pub struct ThreadItem { pub created_at: Option, /// RFC3339 timestamp string for the most recent update (from file mtime). pub updated_at: Option, + /// RFC3339 timestamp string used for product recency ordering. + pub recency_at: Option, } #[allow(dead_code)] @@ -115,6 +117,7 @@ const USER_EVENT_SCAN_LIMIT: usize = 200; pub enum ThreadSortKey { CreatedAt, UpdatedAt, + RecencyAt, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -137,20 +140,29 @@ pub struct ThreadListConfig<'a> { pub layout: ThreadListLayout, } -/// Pagination cursor identifying the timestamp of the last item in a page. +/// Pagination cursor identifying the last item in a page. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Cursor { ts: OffsetDateTime, + id: Option, } impl Cursor { - fn new(ts: OffsetDateTime) -> Self { - Self { ts } + pub(crate) fn new(ts: OffsetDateTime) -> Self { + Self { ts, id: None } + } + + pub(crate) fn with_thread_id(ts: OffsetDateTime, id: ThreadId) -> Self { + Self { ts, id: Some(id) } } pub(crate) fn timestamp(&self) -> OffsetDateTime { self.ts } + + pub(crate) fn thread_id(&self) -> Option { + self.id + } } /// Keeps track of where a paginated listing left off. As the file scan goes newest -> oldest, @@ -287,7 +299,10 @@ impl serde::Serialize for Cursor { .ts .format(&Rfc3339) .map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?; - serializer.serialize_str(&ts_str) + match self.id { + Some(id) => serializer.serialize_str(&format!("{ts_str}|{id}")), + None => serializer.serialize_str(&ts_str), + } } } @@ -308,7 +323,7 @@ impl From for Cursor { .timestamp_nanos_opt() .and_then(|nanos| OffsetDateTime::from_unix_timestamp_nanos(nanos as i128).ok()) .unwrap_or(OffsetDateTime::UNIX_EPOCH); - Self::new(ts) + Self { ts, id: anchor.id } } } @@ -419,7 +434,7 @@ async fn traverse_directories_for_paths( ) .await } - ThreadSortKey::UpdatedAt => { + ThreadSortKey::UpdatedAt | ThreadSortKey::RecencyAt => { traverse_directories_for_paths_updated( root, page_size, @@ -454,7 +469,7 @@ async fn traverse_flat_paths( ) .await } - ThreadSortKey::UpdatedAt => { + ThreadSortKey::UpdatedAt | ThreadSortKey::RecencyAt => { traverse_flat_paths_updated( root, page_size, @@ -702,35 +717,48 @@ async fn traverse_flat_paths_updated( }) } -/// Pagination cursor token format: an RFC3339 timestamp. +/// Pagination cursor token format: an RFC3339 timestamp with an optional thread ID tie-breaker. pub fn parse_cursor(token: &str) -> Option { - if token.contains('|') { - return None; - } + let (timestamp, id) = match token.rsplit_once('|') { + Some((timestamp, id)) => (timestamp, Some(ThreadId::from_string(id).ok()?)), + None => (token, None), + }; - let ts = OffsetDateTime::parse(token, &Rfc3339).ok().or_else(|| { - let format: &[FormatItem] = - format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]"); - PrimitiveDateTime::parse(token, format) - .ok() - .map(PrimitiveDateTime::assume_utc) - })?; + let ts = OffsetDateTime::parse(timestamp, &Rfc3339) + .ok() + .or_else(|| { + let format: &[FormatItem] = + format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]"); + PrimitiveDateTime::parse(timestamp, format) + .ok() + .map(PrimitiveDateTime::assume_utc) + })?; - Some(Cursor::new(ts)) + Some(Cursor { ts, id }) } fn build_next_cursor(items: &[ThreadItem], sort_key: ThreadSortKey) -> Option { let last = items.last()?; let file_name = last.path.file_name()?.to_string_lossy(); - let (created_ts, _id) = parse_timestamp_uuid_from_filename(&file_name)?; + let (created_ts, id) = parse_timestamp_uuid_from_filename(&file_name)?; let ts = match sort_key { ThreadSortKey::CreatedAt => created_ts, ThreadSortKey::UpdatedAt => { let updated_at = last.updated_at.as_deref()?; OffsetDateTime::parse(updated_at, &Rfc3339).ok()? } + ThreadSortKey::RecencyAt => { + let recency_at = last.recency_at.as_deref().or(last.updated_at.as_deref())?; + OffsetDateTime::parse(recency_at, &Rfc3339).ok()? + } }; - Some(Cursor::new(ts)) + match sort_key { + ThreadSortKey::RecencyAt => Some(Cursor::with_thread_id( + ts, + ThreadId::from_string(&id.to_string()).ok()?, + )), + ThreadSortKey::CreatedAt | ThreadSortKey::UpdatedAt => Some(Cursor::new(ts)), + } } async fn build_thread_item( @@ -806,6 +834,7 @@ async fn build_thread_item( model_provider, cli_version, created_at, + recency_at: summary_updated_at.clone(), updated_at: summary_updated_at, }); } diff --git a/codex-rs/rollout/src/metadata.rs b/codex-rs/rollout/src/metadata.rs index 6eabd21ae..61d89f09d 100644 --- a/codex-rs/rollout/src/metadata.rs +++ b/codex-rs/rollout/src/metadata.rs @@ -115,6 +115,7 @@ pub async fn extract_metadata_from_rollout( } if let Some(updated_at) = file_modified_time_utc(rollout_path).await { metadata.updated_at = updated_at; + metadata.recency_at = updated_at; } Ok(ExtractionOutcome { metadata, diff --git a/codex-rs/rollout/src/metadata_tests.rs b/codex-rs/rollout/src/metadata_tests.rs index 43aa62e64..e28ae84df 100644 --- a/codex-rs/rollout/src/metadata_tests.rs +++ b/codex-rs/rollout/src/metadata_tests.rs @@ -72,6 +72,7 @@ async fn extract_metadata_from_rollout_uses_session_meta() { apply_rollout_item(&mut expected, &rollout_line.item, "openai") .expect("rollout item should apply"); expected.updated_at = file_modified_time_utc(&path).await.expect("mtime"); + expected.recency_at = expected.updated_at; assert_eq!(outcome.metadata, expected); assert_eq!(outcome.memory_mode, None); diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index a3e5c0514..113ee372e 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -543,6 +543,27 @@ impl RolloutRecorder { ) .await; } + if sort_key == ThreadSortKey::RecencyAt { + if let Some(repaired_db_page) = state_db::list_threads_db( + state_db_ctx.as_deref(), + codex_home, + page_size, + cursor, + sort_key, + sort_direction, + allowed_sources, + model_providers, + cwd_filters, + /*parent_thread_id*/ None, + archived, + search_term, + ) + .await + { + return Ok(repaired_db_page.into()); + } + return Ok(db_page.into()); + } codex_state::record_fallback( "list_threads", "metadata_filter", @@ -985,6 +1006,11 @@ fn truncate_fs_page( let cursor_token = match sort_key { ThreadSortKey::CreatedAt => created_at.format(&Rfc3339).ok()?, ThreadSortKey::UpdatedAt => item.updated_at.as_deref()?.to_string(), + ThreadSortKey::RecencyAt => item + .recency_at + .as_deref() + .or(item.updated_at.as_deref())? + .to_string(), }; parse_cursor(cursor_token.as_str()) }); @@ -1049,6 +1075,7 @@ fn fill_missing_thread_item_metadata(item: &mut ThreadItem, state_item: ThreadIt cli_version, created_at, updated_at, + recency_at, } = state_item; if item.first_user_message.is_none() { @@ -1093,6 +1120,9 @@ fn fill_missing_thread_item_metadata(item: &mut ThreadItem, state_item: ThreadIt if item.updated_at.is_none() { item.updated_at = updated_at; } + if recency_at.is_some() { + item.recency_at = recency_at; + } } #[allow(clippy::too_many_arguments)] @@ -1269,9 +1299,20 @@ async fn list_threads_from_files_asc( .collect::>(); if let Some(cursor) = cursor { - let anchor = cursor.timestamp(); - all_items - .retain(|item| thread_item_sort_key(item, sort_key).is_some_and(|key| key.0 > anchor)); + let anchor = ( + cursor.timestamp(), + cursor + .thread_id() + .and_then(|id| uuid::Uuid::parse_str(&id.to_string()).ok()), + ); + all_items.retain(|item| { + thread_item_sort_key(item, sort_key).is_some_and(|key| match anchor.1 { + Some(anchor_id) if sort_key == ThreadSortKey::RecencyAt => { + key > (anchor.0, anchor_id) + } + _ => key.0 > anchor.0, + }) + }); } let more_matches_available = all_items.len() > page_size || reached_scan_cap; @@ -1329,14 +1370,27 @@ fn thread_item_sort_key( let updated_at = item.updated_at.as_deref().or(item.created_at.as_deref())?; OffsetDateTime::parse(updated_at, &Rfc3339).ok()? } + ThreadSortKey::RecencyAt => { + let recency_at = item + .recency_at + .as_deref() + .or(item.updated_at.as_deref()) + .or(item.created_at.as_deref())?; + OffsetDateTime::parse(recency_at, &Rfc3339).ok()? + } }; Some((timestamp, id)) } fn cursor_from_thread_item(item: &ThreadItem, sort_key: ThreadSortKey) -> Option { - let (timestamp, _id) = thread_item_sort_key(item, sort_key)?; - let cursor_token = timestamp.format(&Rfc3339).ok()?; - parse_cursor(cursor_token.as_str()) + let (timestamp, id) = thread_item_sort_key(item, sort_key)?; + match sort_key { + ThreadSortKey::RecencyAt => Some(Cursor::with_thread_id( + timestamp, + ThreadId::from_string(&id.to_string()).ok()?, + )), + ThreadSortKey::CreatedAt | ThreadSortKey::UpdatedAt => Some(Cursor::new(timestamp)), + } } struct LogFileInfo { @@ -1725,6 +1779,7 @@ fn thread_item_from_state_metadata(item: codex_state::ThreadMetadata) -> ThreadI cli_version: Some(item.cli_version), created_at: Some(item.created_at.to_rfc3339_opts(SecondsFormat::Secs, true)), updated_at: Some(item.updated_at.to_rfc3339_opts(SecondsFormat::Millis, true)), + recency_at: Some(item.recency_at.to_rfc3339_opts(SecondsFormat::Millis, true)), } } diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index c49a47794..2558750f1 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -984,6 +984,7 @@ fn fill_missing_thread_item_metadata_preserves_identity_and_prefers_state_git_fi model_provider: None, cli_version: None, created_at: None, + recency_at: Some("2025-01-03T15:59:00.000Z".to_string()), updated_at: None, }; let state_item = ThreadItem { @@ -1002,6 +1003,7 @@ fn fill_missing_thread_item_metadata_preserves_identity_and_prefers_state_git_fi model_provider: Some("state-provider".to_string()), cli_version: Some("state-version".to_string()), created_at: Some("2025-01-03T16:00:00Z".to_string()), + recency_at: Some("2025-01-03T16:00:30.001Z".to_string()), updated_at: Some("2025-01-03T16:01:02.003Z".to_string()), }; @@ -1027,6 +1029,7 @@ fn fill_missing_thread_item_metadata_preserves_identity_and_prefers_state_git_fi assert_eq!(item.model_provider.as_deref(), Some("state-provider")); assert_eq!(item.cli_version.as_deref(), Some("state-version")); assert_eq!(item.created_at.as_deref(), Some("2025-01-03T16:00:00Z")); + assert_eq!(item.recency_at.as_deref(), Some("2025-01-03T16:00:30.001Z")); assert_eq!(item.updated_at.as_deref(), Some("2025-01-03T16:01:02.003Z")); } diff --git a/codex-rs/rollout/src/state_db.rs b/codex-rs/rollout/src/state_db.rs index ab93305fa..96fbf7af1 100644 --- a/codex-rs/rollout/src/state_db.rs +++ b/codex-rs/rollout/src/state_db.rs @@ -290,7 +290,10 @@ fn cursor_to_anchor(cursor: Option<&Cursor>) -> Option { let millis = cursor.timestamp().unix_timestamp_nanos() / 1_000_000; let millis = i64::try_from(millis).ok()?; let ts = chrono::DateTime::::from_timestamp_millis(millis)?; - Some(codex_state::Anchor { ts }) + Some(codex_state::Anchor { + ts, + id: cursor.thread_id(), + }) } pub fn normalize_cwd_for_state_db(cwd: &Path) -> PathBuf { @@ -336,6 +339,7 @@ pub async fn list_thread_ids_db( match sort_key { ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt, ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt, + ThreadSortKey::RecencyAt => codex_state::SortKey::RecencyAt, }, allowed_sources.as_slice(), model_providers.as_deref(), @@ -401,6 +405,7 @@ pub async fn list_threads_db( sort_key: match sort_key { ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt, ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt, + ThreadSortKey::RecencyAt => codex_state::SortKey::RecencyAt, }, sort_direction: match sort_direction { SortDirection::Asc => codex_state::SortDirection::Asc, diff --git a/codex-rs/rollout/src/state_db_tests.rs b/codex-rs/rollout/src/state_db_tests.rs index a84ede199..ba92ca27f 100644 --- a/codex-rs/rollout/src/state_db_tests.rs +++ b/codex-rs/rollout/src/state_db_tests.rs @@ -28,6 +28,22 @@ fn cursor_to_anchor_normalizes_timestamp_format() { .expect("nanosecond"); assert_eq!(anchor.ts, expected_ts); + assert_eq!(anchor.id, None); +} + +#[test] +fn cursor_to_anchor_preserves_recency_tie_breaker() { + let id = ThreadId::from_string("00000000-0000-0000-0000-000000000123") + .expect("thread id should parse"); + let token = format!("2026-01-27T12:34:56Z|{id}"); + let cursor = parse_cursor(&token).expect("cursor should parse"); + let anchor = cursor_to_anchor(Some(&cursor)).expect("anchor should parse"); + + assert_eq!(anchor.id, Some(id)); + assert_eq!( + serde_json::to_string(&cursor).expect("cursor should serialize"), + format!("\"{token}\"") + ); } #[tokio::test] diff --git a/codex-rs/rollout/src/tests.rs b/codex-rs/rollout/src/tests.rs index 3c1fc8bb5..0e3ae912d 100644 --- a/codex-rs/rollout/src/tests.rs +++ b/codex-rs/rollout/src/tests.rs @@ -612,6 +612,7 @@ async fn test_list_conversations_latest_first() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some("2025-01-03T12-00-00".into()), + recency_at: updated_times.first().cloned().flatten(), updated_at: updated_times.first().cloned().flatten(), }, ThreadItem { @@ -630,6 +631,7 @@ async fn test_list_conversations_latest_first() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some("2025-01-02T12-00-00".into()), + recency_at: updated_times.get(1).cloned().flatten(), updated_at: updated_times.get(1).cloned().flatten(), }, ThreadItem { @@ -648,6 +650,7 @@ async fn test_list_conversations_latest_first() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some("2025-01-01T12-00-00".into()), + recency_at: updated_times.get(2).cloned().flatten(), updated_at: updated_times.get(2).cloned().flatten(), }, ], @@ -759,6 +762,7 @@ async fn test_pagination_cursor() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some("2025-03-05T09-00-00".into()), + recency_at: updated_page1.first().cloned().flatten(), updated_at: updated_page1.first().cloned().flatten(), }, ThreadItem { @@ -777,6 +781,7 @@ async fn test_pagination_cursor() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some("2025-03-04T09-00-00".into()), + recency_at: updated_page1.get(1).cloned().flatten(), updated_at: updated_page1.get(1).cloned().flatten(), }, ], @@ -831,6 +836,7 @@ async fn test_pagination_cursor() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some("2025-03-03T09-00-00".into()), + recency_at: updated_page2.first().cloned().flatten(), updated_at: updated_page2.first().cloned().flatten(), }, ThreadItem { @@ -849,6 +855,7 @@ async fn test_pagination_cursor() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some("2025-03-02T09-00-00".into()), + recency_at: updated_page2.get(1).cloned().flatten(), updated_at: updated_page2.get(1).cloned().flatten(), }, ], @@ -895,6 +902,7 @@ async fn test_pagination_cursor() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some("2025-03-01T09-00-00".into()), + recency_at: updated_page3.first().cloned().flatten(), updated_at: updated_page3.first().cloned().flatten(), }], next_cursor: None, @@ -1066,6 +1074,7 @@ async fn test_get_thread_contents() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some(ts.into()), + recency_at: page.items[0].updated_at.clone(), updated_at: page.items[0].updated_at.clone(), }], next_cursor: None, @@ -1421,6 +1430,7 @@ async fn test_timestamp_only_cursor_skips_same_second_filesystem_ties() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some(ts.to_string()), + recency_at: updated_page1.first().cloned().flatten(), updated_at: updated_page1.first().cloned().flatten(), }, ThreadItem { @@ -1439,6 +1449,7 @@ async fn test_timestamp_only_cursor_skips_same_second_filesystem_ties() { model_provider: Some(TEST_PROVIDER.to_string()), cli_version: Some("test_version".to_string()), created_at: Some(ts.to_string()), + recency_at: updated_page1.get(1).cloned().flatten(), updated_at: updated_page1.get(1).cloned().flatten(), }, ], diff --git a/codex-rs/state/migrations/0038_threads_recency_at.sql b/codex-rs/state/migrations/0038_threads_recency_at.sql new file mode 100644 index 000000000..ccbf79f05 --- /dev/null +++ b/codex-rs/state/migrations/0038_threads_recency_at.sql @@ -0,0 +1,28 @@ +ALTER TABLE threads ADD COLUMN recency_at INTEGER NOT NULL DEFAULT 0; +ALTER TABLE threads ADD COLUMN recency_at_ms INTEGER NOT NULL DEFAULT 0; + +UPDATE threads +SET recency_at = updated_at, + recency_at_ms = updated_at_ms; + +-- Older binaries can open databases migrated by newer binaries. Seed recency +-- when one of those binaries inserts a thread without the new columns. +CREATE TRIGGER threads_recency_at_after_insert +AFTER INSERT ON threads +WHEN NEW.recency_at_ms = 0 +BEGIN + UPDATE threads + SET recency_at = NEW.updated_at, + recency_at_ms = COALESCE(NEW.updated_at_ms, NEW.updated_at * 1000) + WHERE id = NEW.id; +END; + +CREATE INDEX idx_threads_recency_at_ms + ON threads(recency_at_ms DESC, id DESC); + +CREATE INDEX idx_threads_archived_cwd_recency_at_ms + ON threads(archived, cwd, recency_at_ms DESC, id DESC); + +CREATE INDEX idx_threads_visible_recency_at_ms + ON threads(archived, recency_at_ms DESC, id DESC) + WHERE preview <> ''; diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index 2254b4da8..f2745a553 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -563,6 +563,7 @@ mod tests { rollout_path: PathBuf::from("/tmp/a.jsonl"), created_at, updated_at: created_at, + recency_at: created_at, source: "cli".to_string(), thread_source: None, agent_path: None, diff --git a/codex-rs/state/src/migrations.rs b/codex-rs/state/src/migrations.rs index 0e12f17c4..8637544fa 100644 --- a/codex-rs/state/src/migrations.rs +++ b/codex-rs/state/src/migrations.rs @@ -39,3 +39,7 @@ pub(crate) fn runtime_goals_migrator() -> Migrator { pub(crate) fn runtime_memories_migrator() -> Migrator { runtime_migrator(&MEMORIES_MIGRATOR) } + +#[cfg(test)] +#[path = "migrations_tests.rs"] +mod tests; diff --git a/codex-rs/state/src/migrations_tests.rs b/codex-rs/state/src/migrations_tests.rs new file mode 100644 index 000000000..f7f137a5e --- /dev/null +++ b/codex-rs/state/src/migrations_tests.rs @@ -0,0 +1,129 @@ +use std::borrow::Cow; + +use sqlx::Row; +use sqlx::migrate::Migrator; +use sqlx::sqlite::SqlitePoolOptions; + +use super::STATE_MIGRATOR; + +fn migrator_through(version: i64) -> Migrator { + Migrator { + migrations: Cow::Owned( + STATE_MIGRATOR + .migrations + .iter() + .filter(|migration| migration.version <= version) + .cloned() + .collect(), + ), + ignore_missing: STATE_MIGRATOR.ignore_missing, + locking: STATE_MIGRATOR.locking, + table_name: STATE_MIGRATOR.table_name.clone(), + create_schemas: STATE_MIGRATOR.create_schemas.clone(), + no_tx: STATE_MIGRATOR.no_tx, + } +} + +#[tokio::test] +async fn recency_migration_backfills_and_seeds_old_binary_inserts() { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("in-memory database should open"); + migrator_through(/*version*/ 37) + .run(&pool) + .await + .expect("pre-recency migrations should apply"); + + sqlx::query( + r#" +INSERT INTO threads ( + id, + rollout_path, + created_at, + updated_at, + created_at_ms, + updated_at_ms, + source, + model_provider, + cwd, + title, + sandbox_policy, + approval_mode +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind("00000000-0000-0000-0000-000000000001") + .bind("/tmp/first.jsonl") + .bind(1_700_000_000_i64) + .bind(1_700_000_100_i64) + .bind(1_700_000_000_123_i64) + .bind(1_700_000_100_456_i64) + .bind("cli") + .bind("openai") + .bind("/tmp") + .bind("") + .bind("read-only") + .bind("on-request") + .execute(&pool) + .await + .expect("legacy row should insert"); + + STATE_MIGRATOR + .run(&pool) + .await + .expect("recency migration should apply"); + + let backfilled = sqlx::query( + "SELECT updated_at, updated_at_ms, recency_at, recency_at_ms FROM threads WHERE id = ?", + ) + .bind("00000000-0000-0000-0000-000000000001") + .fetch_one(&pool) + .await + .expect("backfilled row should load"); + assert_eq!(backfilled.get::("recency_at"), 1_700_000_100); + assert_eq!(backfilled.get::("recency_at_ms"), 1_700_000_100_456); + + sqlx::query( + r#" +INSERT INTO threads ( + id, + rollout_path, + created_at, + updated_at, + created_at_ms, + updated_at_ms, + source, + model_provider, + cwd, + title, + sandbox_policy, + approval_mode +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind("00000000-0000-0000-0000-000000000002") + .bind("/tmp/second.jsonl") + .bind(1_700_000_200_i64) + .bind(1_700_000_300_i64) + .bind(1_700_000_200_123_i64) + .bind(1_700_000_300_456_i64) + .bind("cli") + .bind("openai") + .bind("/tmp") + .bind("") + .bind("read-only") + .bind("on-request") + .execute(&pool) + .await + .expect("old-binary row should insert"); + + let seeded = sqlx::query("SELECT recency_at, recency_at_ms FROM threads WHERE id = ?") + .bind("00000000-0000-0000-0000-000000000002") + .fetch_one(&pool) + .await + .expect("old-binary row should load"); + assert_eq!(seeded.get::("recency_at"), 1_700_000_300); + assert_eq!(seeded.get::("recency_at_ms"), 1_700_000_300_456); +} diff --git a/codex-rs/state/src/model/thread_metadata.rs b/codex-rs/state/src/model/thread_metadata.rs index a30f7e479..c25c45898 100644 --- a/codex-rs/state/src/model/thread_metadata.rs +++ b/codex-rs/state/src/model/thread_metadata.rs @@ -18,6 +18,8 @@ pub enum SortKey { CreatedAt, /// Sort by the thread's last update timestamp. UpdatedAt, + /// Sort by the thread's product recency timestamp. + RecencyAt, } /// Sort direction to use when listing threads. @@ -32,6 +34,8 @@ pub enum SortDirection { pub struct Anchor { /// The timestamp component of the anchor. pub ts: DateTime, + /// The thread ID component used to disambiguate equal recency timestamps. + pub id: Option, } /// A single page of thread metadata results. @@ -67,6 +71,8 @@ pub struct ThreadMetadata { pub created_at: DateTime, /// The last update timestamp. pub updated_at: DateTime, + /// The product recency timestamp. + pub recency_at: DateTime, /// The session source (stringified enum). pub source: String, /// Optional analytics source classification for this thread. @@ -120,6 +126,8 @@ pub struct ThreadMetadataBuilder { pub created_at: DateTime, /// The last update timestamp, if known. pub updated_at: Option>, + /// The product recency timestamp, if known. + pub recency_at: Option>, /// The session source. pub source: SessionSource, /// Optional analytics source classification for this thread. @@ -163,6 +171,7 @@ impl ThreadMetadataBuilder { rollout_path, created_at, updated_at: None, + recency_at: None, source, thread_source: None, agent_nickname: None, @@ -190,11 +199,16 @@ impl ThreadMetadataBuilder { .updated_at .map(canonicalize_datetime) .unwrap_or(created_at); + let recency_at = self + .recency_at + .map(canonicalize_datetime) + .unwrap_or(updated_at); ThreadMetadata { id: self.id, rollout_path: self.rollout_path.clone(), created_at, updated_at, + recency_at, source, thread_source: self.thread_source.clone(), agent_nickname: self.agent_nickname.clone(), @@ -340,6 +354,7 @@ pub(crate) struct ThreadRow { rollout_path: String, created_at: i64, updated_at: i64, + recency_at: i64, source: String, thread_source: Option, agent_nickname: Option, @@ -369,6 +384,7 @@ impl ThreadRow { rollout_path: row.try_get("rollout_path")?, created_at: row.try_get("created_at")?, updated_at: row.try_get("updated_at")?, + recency_at: row.try_get("recency_at")?, source: row.try_get("source")?, thread_source: row.try_get("thread_source")?, agent_nickname: row.try_get("agent_nickname")?, @@ -402,6 +418,7 @@ impl TryFrom for ThreadMetadata { rollout_path, created_at, updated_at, + recency_at, source, thread_source, agent_nickname, @@ -432,6 +449,7 @@ impl TryFrom for ThreadMetadata { rollout_path: PathBuf::from(rollout_path), created_at: epoch_millis_to_datetime(created_at)?, updated_at: epoch_millis_to_datetime(updated_at)?, + recency_at: epoch_millis_to_datetime(recency_at)?, source, thread_source, agent_nickname, @@ -461,8 +479,12 @@ pub(crate) fn anchor_from_item(item: &ThreadMetadata, sort_key: SortKey) -> Opti let ts = match sort_key { SortKey::CreatedAt => item.created_at, SortKey::UpdatedAt => item.updated_at, + SortKey::RecencyAt => item.recency_at, }; - Some(Anchor { ts }) + Some(Anchor { + ts, + id: (sort_key == SortKey::RecencyAt).then_some(item.id), + }) } pub(crate) fn datetime_to_epoch_millis(dt: DateTime) -> i64 { @@ -519,6 +541,7 @@ mod tests { rollout_path: "/tmp/rollout-123.jsonl".to_string(), created_at: 1_700_000_000, updated_at: 1_700_000_100, + recency_at: 1_700_000_100, source: "cli".to_string(), thread_source: None, agent_nickname: None, @@ -549,6 +572,7 @@ mod tests { rollout_path: PathBuf::from("/tmp/rollout-123.jsonl"), created_at: DateTime::::from_timestamp(1_700_000_000, 0).expect("timestamp"), updated_at: DateTime::::from_timestamp(1_700_000_100, 0).expect("timestamp"), + recency_at: DateTime::::from_timestamp(1_700_000_100, 0).expect("timestamp"), source: "cli".to_string(), thread_source: None, agent_nickname: None, diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 0e5dc8558..2ef524889 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -160,6 +160,7 @@ pub struct StateRuntime { thread_goals: GoalStore, memories: MemoryStore, thread_updated_at_millis: Arc, + thread_recency_at_millis: Arc, } impl StateRuntime { @@ -262,32 +263,36 @@ impl StateRuntime { return Err(err); } let started = Instant::now(); - let thread_updated_at_millis_result: anyhow::Result> = - sqlx::query_scalar("SELECT MAX(threads.updated_at_ms) FROM threads") - .fetch_one(pool.as_ref()) - .await - .map_err(anyhow::Error::from); + let thread_timestamp_millis_result: anyhow::Result<(Option, Option)> = + sqlx::query_as( + "SELECT MAX(threads.updated_at_ms), MAX(threads.recency_at_ms) FROM threads", + ) + .fetch_one(pool.as_ref()) + .await + .map_err(anyhow::Error::from); crate::telemetry::record_init_result( telemetry_override, DbKind::State, "post_init_query", started.elapsed(), - &thread_updated_at_millis_result, + &thread_timestamp_millis_result, ); - let thread_updated_at_millis = match thread_updated_at_millis_result { - Ok(value) => value, - Err(err) => { - close_sqlite_pools(&[ - pool.as_ref(), - logs_pool.as_ref(), - goals_pool.as_ref(), - memories_pool.as_ref(), - ]) - .await; - return Err(err); - } - }; + let (thread_updated_at_millis, thread_recency_at_millis) = + match thread_timestamp_millis_result { + Ok(value) => value, + Err(err) => { + close_sqlite_pools(&[ + pool.as_ref(), + logs_pool.as_ref(), + goals_pool.as_ref(), + memories_pool.as_ref(), + ]) + .await; + return Err(err); + } + }; let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0); + let thread_recency_at_millis = thread_recency_at_millis.unwrap_or(0); let runtime = Arc::new(Self { thread_goals: GoalStore::new(Arc::clone(&goals_pool)), memories: MemoryStore::new(Arc::clone(&memories_pool), Arc::clone(&pool)), @@ -296,6 +301,7 @@ impl StateRuntime { codex_home, default_provider, thread_updated_at_millis: Arc::new(AtomicI64::new(thread_updated_at_millis)), + thread_recency_at_millis: Arc::new(AtomicI64::new(thread_recency_at_millis)), }); if let Err(err) = runtime.run_logs_startup_maintenance().await { warn!( diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 895c25435..9b2c337f5 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -175,6 +175,7 @@ SELECT threads.rollout_path, threads.created_at_ms AS created_at, threads.updated_at_ms AS updated_at, + threads.recency_at_ms AS recency_at, threads.source, threads.thread_source, threads.agent_path, @@ -545,6 +546,7 @@ SELECT threads.rollout_path, threads.created_at_ms AS created_at, threads.updated_at_ms AS updated_at, + threads.recency_at_ms AS recency_at, threads.source, threads.thread_source, threads.agent_nickname, diff --git a/codex-rs/state/src/runtime/test_support.rs b/codex-rs/state/src/runtime/test_support.rs index 848a19333..c029a0ee2 100644 --- a/codex-rs/state/src/runtime/test_support.rs +++ b/codex-rs/state/src/runtime/test_support.rs @@ -47,6 +47,7 @@ pub(super) fn test_thread_metadata( rollout_path: codex_home.join(format!("rollout-{thread_id}.jsonl")), created_at: now, updated_at: now, + recency_at: now, source: "cli".to_string(), thread_source: None, agent_nickname: None, diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 81a490a0c..7bf354437 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -1,6 +1,7 @@ use super::*; use crate::SortDirection; use codex_protocol::protocol::SessionSource; +use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; impl StateRuntime { @@ -12,6 +13,7 @@ SELECT threads.rollout_path, threads.created_at_ms AS created_at, threads.updated_at_ms AS updated_at, + threads.recency_at_ms AS recency_at, threads.source, threads.thread_source, threads.agent_nickname, @@ -498,6 +500,7 @@ ON CONFLICT(child_thread_id) DO NOTHING metadata: &crate::ThreadMetadata, ) -> anyhow::Result { let updated_at = self.allocate_thread_updated_at(metadata.updated_at)?; + let recency_at = self.allocate_thread_recency_at(metadata.recency_at)?; let preview = metadata_preview(metadata); let result = sqlx::query( r#" @@ -506,8 +509,10 @@ INSERT INTO threads ( rollout_path, created_at, updated_at, + recency_at, created_at_ms, updated_at_ms, + recency_at_ms, source, thread_source, agent_nickname, @@ -530,7 +535,7 @@ INSERT INTO threads ( git_branch, git_origin_url, memory_mode -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO NOTHING "#, ) @@ -538,8 +543,10 @@ ON CONFLICT(id) DO NOTHING .bind(metadata.rollout_path.display().to_string()) .bind(datetime_to_epoch_seconds(metadata.created_at)) .bind(datetime_to_epoch_seconds(updated_at)) + .bind(datetime_to_epoch_seconds(recency_at)) .bind(datetime_to_epoch_millis(metadata.created_at)) .bind(datetime_to_epoch_millis(updated_at)) + .bind(datetime_to_epoch_millis(recency_at)) .bind(metadata.source.as_str()) .bind( metadata @@ -621,6 +628,32 @@ ON CONFLICT(id) DO NOTHING Ok(result.rows_affected() > 0) } + pub async fn touch_thread_recency_at( + &self, + thread_id: ThreadId, + recency_at: DateTime, + ) -> anyhow::Result { + let recency_at = self.allocate_thread_recency_at(recency_at)?; + let recency_at_seconds = datetime_to_epoch_seconds(recency_at); + let recency_at_millis = datetime_to_epoch_millis(recency_at); + let result = sqlx::query( + r#" +UPDATE threads +SET + recency_at = MAX(?, MAX(?, recency_at_ms + 1) / 1000), + recency_at_ms = MAX(?, recency_at_ms + 1) +WHERE id = ? + "#, + ) + .bind(recency_at_seconds) + .bind(recency_at_millis) + .bind(recency_at_millis) + .bind(thread_id.to_string()) + .execute(self.pool.as_ref()) + .await?; + Ok(result.rows_affected() > 0) + } + /// Allocate a persisted `updated_at` value for thread-list cursor ordering. /// /// We keep a process-local high-water mark so hot rollout writes can get unique, @@ -631,42 +664,56 @@ ON CONFLICT(id) DO NOTHING &self, updated_at: DateTime, ) -> anyhow::Result> { - let candidate = datetime_to_epoch_millis(updated_at); - let allocated = loop { - let current = self.thread_updated_at_millis.load(Ordering::Relaxed); - - // New wall-clock time: advance the process-local high-water mark and use it as-is. - if candidate > current { - if self - .thread_updated_at_millis - .compare_exchange(current, candidate, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - break candidate; - } - continue; - } - - // Older timestamps come from backfill/repair paths that preserve rollout mtimes. - // Do not drag historical rows forward just because this process has seen newer writes. - if candidate.saturating_add(1000) <= current { - break candidate; - } - - // Same hot one-second bucket as the current high-water mark. Allocate the next - // millisecond so updated_at remains unique and cursor-orderable inside the process. - let bumped = current.saturating_add(1); - if self - .thread_updated_at_millis - .compare_exchange(current, bumped, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - break bumped; - } - }; - epoch_millis_to_datetime(allocated) + allocate_thread_timestamp(self.thread_updated_at_millis.as_ref(), updated_at) } + fn allocate_thread_recency_at( + &self, + recency_at: DateTime, + ) -> anyhow::Result> { + allocate_thread_timestamp(self.thread_recency_at_millis.as_ref(), recency_at) + } +} + +fn allocate_thread_timestamp( + high_water_mark: &AtomicI64, + timestamp: DateTime, +) -> anyhow::Result> { + let candidate = datetime_to_epoch_millis(timestamp); + let allocated = loop { + let current = high_water_mark.load(Ordering::Relaxed); + + // New wall-clock time: advance the process-local high-water mark and use it as-is. + if candidate > current { + if high_water_mark + .compare_exchange(current, candidate, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + break candidate; + } + continue; + } + + // Older timestamps come from backfill/repair paths that preserve rollout mtimes. + // Do not drag historical rows forward just because this process has seen newer writes. + if candidate.saturating_add(1000) <= current { + break candidate; + } + + // Same hot one-second bucket as the current high-water mark. Allocate the next + // millisecond so the timestamp remains unique and cursor-orderable inside the process. + let bumped = current.saturating_add(1); + if high_water_mark + .compare_exchange(current, bumped, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + break bumped; + } + }; + epoch_millis_to_datetime(allocated) +} + +impl StateRuntime { pub async fn update_thread_git_info( &self, thread_id: ThreadId, @@ -702,6 +749,7 @@ WHERE id = ? creation_memory_mode: Option<&str>, ) -> anyhow::Result<()> { let updated_at = self.allocate_thread_updated_at(metadata.updated_at)?; + let insert_recency_at = self.allocate_thread_recency_at(metadata.recency_at)?; let preview = metadata_preview(metadata); // Backfill/reconcile callers merge existing git info before upserting, but that // read/modify/write is not atomic. Preserve non-null SQLite git fields here so @@ -713,8 +761,10 @@ INSERT INTO threads ( rollout_path, created_at, updated_at, + recency_at, created_at_ms, updated_at_ms, + recency_at_ms, source, thread_source, agent_nickname, @@ -737,13 +787,15 @@ INSERT INTO threads ( git_branch, git_origin_url, memory_mode -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET rollout_path = excluded.rollout_path, created_at = excluded.created_at, updated_at = excluded.updated_at, + recency_at = threads.recency_at, created_at_ms = excluded.created_at_ms, updated_at_ms = excluded.updated_at_ms, + recency_at_ms = threads.recency_at_ms, source = excluded.source, thread_source = excluded.thread_source, agent_nickname = excluded.agent_nickname, @@ -771,8 +823,10 @@ ON CONFLICT(id) DO UPDATE SET .bind(metadata.rollout_path.display().to_string()) .bind(datetime_to_epoch_seconds(metadata.created_at)) .bind(datetime_to_epoch_seconds(updated_at)) + .bind(datetime_to_epoch_seconds(insert_recency_at)) .bind(datetime_to_epoch_millis(metadata.created_at)) .bind(datetime_to_epoch_millis(updated_at)) + .bind(datetime_to_epoch_millis(insert_recency_at)) .bind(metadata.source.as_str()) .bind( metadata @@ -1079,6 +1133,7 @@ SELECT threads.rollout_path, threads.created_at_ms AS created_at, threads.updated_at_ms AS updated_at, + threads.recency_at_ms AS recency_at, threads.source, threads.thread_source, threads.agent_nickname, @@ -1197,6 +1252,7 @@ pub(super) fn push_thread_filters<'a>( let column = match sort_key { SortKey::CreatedAt => "threads.created_at_ms", SortKey::UpdatedAt => "threads.updated_at_ms", + SortKey::RecencyAt => "threads.recency_at_ms", }; let operator = match sort_direction { SortDirection::Asc => ">", @@ -1208,6 +1264,19 @@ pub(super) fn push_thread_filters<'a>( builder.push(operator); builder.push(" "); builder.push_bind(anchor_ts); + if sort_key == SortKey::RecencyAt + && let Some(anchor_id) = anchor.id + { + builder.push(" OR ("); + builder.push(column); + builder.push(" = "); + builder.push_bind(anchor_ts); + builder.push(" AND threads.id "); + builder.push(operator); + builder.push(" "); + builder.push_bind(anchor_id.to_string()); + builder.push(")"); + } builder.push(")"); } } @@ -1232,6 +1301,7 @@ pub(super) fn push_thread_order_and_limit( let order_column = match sort_key { SortKey::CreatedAt => "threads.created_at_ms", SortKey::UpdatedAt => "threads.updated_at_ms", + SortKey::RecencyAt => "threads.recency_at_ms", }; let order_direction = match sort_direction { SortDirection::Asc => "ASC", @@ -1247,6 +1317,10 @@ pub(super) fn push_thread_order_and_limit( builder.push(order_column); builder.push(" "); builder.push(order_direction); + if sort_key == SortKey::RecencyAt { + builder.push(", threads.id "); + builder.push(order_direction); + } builder.push(" LIMIT "); builder.push_bind(limit as i64); } @@ -1521,6 +1595,7 @@ mod tests { let anchor = Anchor { ts: older_updated_at, + id: None, }; let model_providers = ["test-provider".to_string()]; let page = runtime @@ -1547,6 +1622,7 @@ mod tests { Some(Anchor { ts: DateTime::::from_timestamp_millis(1_700_000_200_000) .expect("valid timestamp"), + id: None, }) ); @@ -1631,6 +1707,7 @@ mod tests { Some(Anchor { ts: DateTime::::from_timestamp_millis(1_700_000_300_000) .expect("valid timestamp"), + id: None, }) ); @@ -1693,6 +1770,7 @@ mod tests { ]; let anchor = Anchor { ts: DateTime::::from_timestamp(1_700_000_000, 0).expect("valid timestamp"), + id: None, }; for (sort_key, visible_index, cwd_index) in [ ( @@ -1705,6 +1783,11 @@ mod tests { "idx_threads_visible_updated_at_ms", "idx_threads_archived_cwd_updated_at_ms", ), + ( + SortKey::RecencyAt, + "idx_threads_visible_recency_at_ms", + "idx_threads_archived_cwd_recency_at_ms", + ), ] { for (cwd_filters, anchor, expected_index, expect_temp_sort) in [ (None, None, visible_index, false), @@ -2279,6 +2362,186 @@ mod tests { assert_eq!(persisted.preview.as_deref(), Some("first-user-message")); } + #[tokio::test] + async fn touch_thread_recency_at_is_monotonic_and_survives_stale_upsert() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("state db should initialize"); + let thread_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000792").expect("valid thread id"); + let mut metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone()); + let original_recency_at = metadata.recency_at; + runtime + .upsert_thread(&metadata) + .await + .expect("initial upsert should succeed"); + + let touched_at = + DateTime::::from_timestamp_millis(1_700_001_111_123).expect("timestamp"); + assert!( + runtime + .touch_thread_recency_at(thread_id, touched_at) + .await + .expect("touch should succeed") + ); + + metadata.updated_at = + DateTime::::from_timestamp_millis(1_700_001_222_456).expect("timestamp"); + metadata.title = "updated metadata".to_string(); + assert_eq!(metadata.recency_at, original_recency_at); + runtime + .upsert_thread(&metadata) + .await + .expect("stale metadata upsert should succeed"); + + let persisted = runtime + .get_thread(thread_id) + .await + .expect("thread should load") + .expect("thread should exist"); + assert_eq!(persisted.recency_at, touched_at); + assert_eq!(persisted.updated_at, metadata.updated_at); + assert_eq!(persisted.title, "updated metadata"); + + assert!( + runtime + .touch_thread_recency_at(thread_id, original_recency_at) + .await + .expect("older touch should succeed") + ); + let persisted = runtime + .get_thread(thread_id) + .await + .expect("thread should load") + .expect("thread should exist"); + assert_eq!( + datetime_to_epoch_millis(persisted.recency_at), + datetime_to_epoch_millis(touched_at) + 1 + ); + } + + #[tokio::test] + async fn list_threads_orders_and_pages_by_recency_at() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("state db should initialize"); + let first_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000793").expect("valid thread id"); + let second_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000794").expect("valid thread id"); + let third_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000795").expect("valid thread id"); + let recency_at = + DateTime::::from_timestamp_millis(1_700_002_000_456).expect("timestamp"); + + for thread_id in [first_id, second_id, third_id] { + let mut metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone()); + metadata.recency_at = recency_at; + runtime + .upsert_thread(&metadata) + .await + .expect("thread insert should succeed"); + } + sqlx::query("UPDATE threads SET recency_at = ?, recency_at_ms = ?") + .bind(datetime_to_epoch_seconds(recency_at)) + .bind(datetime_to_epoch_millis(recency_at)) + .execute(runtime.pool.as_ref()) + .await + .expect("recency timestamps should match"); + + let first_page = runtime + .list_threads( + /*page_size*/ 1, + ThreadFilterOptions { + archived_only: false, + allowed_sources: &[], + model_providers: None, + cwd_filters: None, + anchor: None, + sort_key: SortKey::RecencyAt, + sort_direction: SortDirection::Desc, + search_term: None, + }, + ) + .await + .expect("list should succeed"); + assert_eq!( + first_page + .items + .iter() + .map(|item| item.id) + .collect::>(), + vec![third_id] + ); + assert_eq!( + first_page.next_anchor, + Some(Anchor { + ts: recency_at, + id: Some(third_id), + }) + ); + + let second_page = runtime + .list_threads( + /*page_size*/ 1, + ThreadFilterOptions { + archived_only: false, + allowed_sources: &[], + model_providers: None, + cwd_filters: None, + anchor: first_page.next_anchor.as_ref(), + sort_key: SortKey::RecencyAt, + sort_direction: SortDirection::Desc, + search_term: None, + }, + ) + .await + .expect("second list should succeed"); + assert_eq!( + second_page + .items + .iter() + .map(|item| item.id) + .collect::>(), + vec![second_id] + ); + assert_eq!( + second_page.next_anchor, + Some(Anchor { + ts: recency_at, + id: Some(second_id), + }) + ); + + let third_page = runtime + .list_threads( + /*page_size*/ 1, + ThreadFilterOptions { + archived_only: false, + allowed_sources: &[], + model_providers: None, + cwd_filters: None, + anchor: second_page.next_anchor.as_ref(), + sort_key: SortKey::RecencyAt, + sort_direction: SortDirection::Desc, + search_term: None, + }, + ) + .await + .expect("third list should succeed"); + assert_eq!( + third_page + .items + .iter() + .map(|item| item.id) + .collect::>(), + vec![first_id] + ); + assert_eq!(third_page.next_anchor, None); + } + #[tokio::test] async fn thread_updated_at_uses_unique_epoch_millis_and_reads_legacy_seconds() { let codex_home = unique_temp_dir(); @@ -2295,8 +2558,10 @@ mod tests { DateTime::::from_timestamp_millis(1_700_001_111_123).expect("timestamp millis"); let mut first = test_thread_metadata(&codex_home, first_id, codex_home.clone()); first.updated_at = updated_at; + first.recency_at = updated_at; let mut second = test_thread_metadata(&codex_home, second_id, codex_home.clone()); second.updated_at = updated_at; + second.recency_at = updated_at; runtime .upsert_thread(&first) @@ -2325,6 +2590,14 @@ mod tests { datetime_to_epoch_millis(second.updated_at), 1_700_001_111_124 ); + assert_eq!( + datetime_to_epoch_millis(first.recency_at), + 1_700_001_111_123 + ); + assert_eq!( + datetime_to_epoch_millis(second.recency_at), + 1_700_001_111_124 + ); let second_row: (i64, i64, Option, Option) = sqlx::query_as( "SELECT created_at, updated_at, created_at_ms, updated_at_ms FROM threads WHERE id = ?", ) diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index d02b78d23..7eda54a9c 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -535,6 +535,9 @@ fn stored_thread_from_state( updated_at: metadata .and_then(|metadata| metadata.updated_at) .unwrap_or_else(Utc::now), + recency_at: metadata + .and_then(|metadata| metadata.advance_recency_at.or(metadata.updated_at)) + .unwrap_or_else(Utc::now), archived_at: None, cwd: metadata .and_then(|metadata| metadata.cwd.clone()) diff --git a/codex-rs/thread-store/src/local/archive_thread.rs b/codex-rs/thread-store/src/local/archive_thread.rs index bbe5c3de5..eb4a48b68 100644 --- a/codex-rs/thread-store/src/local/archive_thread.rs +++ b/codex-rs/thread-store/src/local/archive_thread.rs @@ -174,5 +174,6 @@ mod tests { .expect("thread metadata should exist"); assert_eq!(updated.rollout_path, archived_path); assert!(updated.archived_at.is_some()); + assert_eq!(updated.recency_at, metadata.recency_at); } } diff --git a/codex-rs/thread-store/src/local/helpers.rs b/codex-rs/thread-store/src/local/helpers.rs index 5acdcf09b..f48a0136c 100644 --- a/codex-rs/thread-store/src/local/helpers.rs +++ b/codex-rs/thread-store/src/local/helpers.rs @@ -106,6 +106,7 @@ pub(super) fn stored_thread_from_rollout_item( .or_else(|| thread_id_from_rollout_path(item.path.as_path()))?; let created_at = parse_rfc3339(item.created_at.as_deref()).unwrap_or_else(Utc::now); let updated_at = parse_rfc3339(item.updated_at.as_deref()).unwrap_or(created_at); + let recency_at = parse_rfc3339(item.recency_at.as_deref()).unwrap_or(updated_at); let archived_at = archived.then_some(updated_at); let git_info = git_info_from_parts( item.git_sha.clone(), @@ -136,6 +137,7 @@ pub(super) fn stored_thread_from_rollout_item( reasoning_effort: None, created_at, updated_at, + recency_at, archived_at, cwd: item.cwd.unwrap_or_default(), cli_version: item.cli_version.unwrap_or_default(), diff --git a/codex-rs/thread-store/src/local/list_threads.rs b/codex-rs/thread-store/src/local/list_threads.rs index fc3b5034a..03ae163af 100644 --- a/codex-rs/thread-store/src/local/list_threads.rs +++ b/codex-rs/thread-store/src/local/list_threads.rs @@ -34,6 +34,7 @@ pub(super) async fn list_threads( let sort_key = match params.sort_key { ThreadSortKey::CreatedAt => codex_rollout::ThreadSortKey::CreatedAt, ThreadSortKey::UpdatedAt => codex_rollout::ThreadSortKey::UpdatedAt, + ThreadSortKey::RecencyAt => codex_rollout::ThreadSortKey::RecencyAt, }; let sort_direction = match params.sort_direction { SortDirection::Asc => codex_rollout::SortDirection::Asc, diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index dd49fd2a5..5670e5638 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -314,10 +314,16 @@ mod tests { use codex_protocol::ThreadId; use codex_protocol::models::BaseInstructions; + use codex_protocol::models::FunctionCallOutputPayload; + use codex_protocol::models::MessagePhase; + use codex_protocol::models::ResponseItem; + use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::ThreadMemoryMode; + use codex_protocol::protocol::TurnCompleteEvent; + use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; use tempfile::TempDir; @@ -449,6 +455,91 @@ mod tests { assert_eq!(metadata.title, "observed append"); } + #[tokio::test] + async fn live_thread_output_advances_updated_at_but_not_recency_at() { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = Arc::new(LocalThreadStore::new(config, Some(runtime.clone()))); + let thread_id = ThreadId::default(); + let live_thread = LiveThread::create(store, create_thread_params(thread_id)) + .await + .expect("create live thread"); + + live_thread + .append_items(&[user_message_item("start thread")]) + .await + .expect("append initial user message"); + live_thread.flush().await.expect("flush thread"); + let before_turn_start = runtime + .get_thread(thread_id) + .await + .expect("sqlite metadata read") + .expect("sqlite metadata"); + + live_thread + .append_items(&[RolloutItem::EventMsg(EventMsg::TurnStarted( + TurnStartedEvent { + turn_id: "turn-1".to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + }, + ))]) + .await + .expect("append turn start"); + live_thread.flush().await.expect("flush thread"); + let after_turn_start = runtime + .get_thread(thread_id) + .await + .expect("sqlite metadata read") + .expect("sqlite metadata"); + assert!(after_turn_start.recency_at > before_turn_start.recency_at); + + live_thread + .append_items(&[ + RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent { + message: "commentary".to_string(), + phase: Some(MessagePhase::Commentary), + memory_citation: None, + })), + RolloutItem::ResponseItem(ResponseItem::FunctionCallOutput { + call_id: "call-1".to_string(), + output: FunctionCallOutputPayload::from_text("tool output".to_string()), + }), + RolloutItem::EventMsg(EventMsg::TokenCount( + codex_protocol::protocol::TokenCountEvent { + info: None, + rate_limits: None, + }, + )), + RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-1".to_string(), + last_agent_message: None, + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + })), + ]) + .await + .expect("append post-start items"); + live_thread.flush().await.expect("flush thread"); + let completed = runtime + .get_thread(thread_id) + .await + .expect("sqlite metadata read") + .expect("sqlite metadata"); + + assert!(completed.updated_at > after_turn_start.updated_at); + assert_eq!(completed.recency_at, after_turn_start.recency_at); + } + #[tokio::test] async fn live_thread_shutdown_does_not_materialize_empty_thread_metadata() { let home = TempDir::new().expect("temp dir"); diff --git a/codex-rs/thread-store/src/local/read_thread.rs b/codex-rs/thread-store/src/local/read_thread.rs index 4778dd69d..a1e1c5919 100644 --- a/codex-rs/thread-store/src/local/read_thread.rs +++ b/codex-rs/thread-store/src/local/read_thread.rs @@ -55,6 +55,7 @@ pub(super) async fn read_thread( && (params.include_archived || rollout_thread.archived_at.is_none()) && !rollout_thread.preview.is_empty() { + rollout_thread.recency_at = thread.recency_at; if thread.name.is_some() { rollout_thread.name = thread.name; } @@ -115,6 +116,7 @@ pub(super) async fn read_thread_by_rollout_path( }); } if let Some(metadata) = read_sqlite_metadata(store, thread.thread_id).await { + thread.recency_at = metadata.recency_at; let existing_git_info = thread.git_info.take(); let (fallback_sha, fallback_branch, fallback_origin_url) = match existing_git_info { Some(info) => ( @@ -340,6 +342,7 @@ async fn stored_thread_from_sqlite_metadata( reasoning_effort: metadata.reasoning_effort, created_at: metadata.created_at, updated_at: metadata.updated_at, + recency_at: metadata.recency_at, archived_at: metadata.archived_at, cwd: metadata.cwd, cli_version: metadata.cli_version, @@ -406,6 +409,7 @@ fn stored_thread_from_meta_line( reasoning_effort: None, created_at, updated_at, + recency_at: updated_at, archived_at: archived.then_some(updated_at), cwd: meta_line.meta.cwd, cli_version: meta_line.meta.cli_version, @@ -547,6 +551,10 @@ mod tests { ); builder.model_provider = Some(config.default_model_provider_id.clone()); builder.git_branch = Some("sqlite-branch".to_string()); + let recency_at = chrono::DateTime::parse_from_rfc3339("2026-01-03T12:00:00Z") + .expect("timestamp should parse") + .with_timezone(&Utc); + builder.recency_at = Some(recency_at); runtime .upsert_thread(&builder.build(config.default_model_provider_id.as_str())) .await @@ -562,6 +570,7 @@ mod tests { .expect("read thread by rollout path"); let git_info = thread.git_info.expect("git info should be present"); + assert_eq!(thread.recency_at, recency_at); assert_eq!(git_info.branch.as_deref(), Some("sqlite-branch")); assert_eq!( git_info.commit_hash.as_ref().map(|sha| sha.0.as_str()), diff --git a/codex-rs/thread-store/src/local/search_threads.rs b/codex-rs/thread-store/src/local/search_threads.rs index 1b7f18a32..fc2ddd6e7 100644 --- a/codex-rs/thread-store/src/local/search_threads.rs +++ b/codex-rs/thread-store/src/local/search_threads.rs @@ -23,6 +23,10 @@ use crate::ThreadSortKey; use crate::ThreadStoreError; use crate::ThreadStoreResult; +#[cfg(test)] +#[path = "search_threads_tests.rs"] +mod tests; + struct ThreadSearchItem { item: codex_rollout::ThreadItem, snippet: String, @@ -50,6 +54,7 @@ pub(super) async fn search_threads( let sort_key = match params.sort_key { ThreadSortKey::CreatedAt => codex_rollout::ThreadSortKey::CreatedAt, ThreadSortKey::UpdatedAt => codex_rollout::ThreadSortKey::UpdatedAt, + ThreadSortKey::RecencyAt => codex_rollout::ThreadSortKey::RecencyAt, }; let sort_direction = match params.sort_direction { SortDirection::Asc => codex_rollout::SortDirection::Asc, @@ -179,8 +184,17 @@ fn cursor_from_thread_search_item( .updated_at .as_deref() .or(item.item.created_at.as_deref())?, + ThreadSortKey::RecencyAt => item + .item + .recency_at + .as_deref() + .or(item.item.updated_at.as_deref()) + .or(item.item.created_at.as_deref())?, }; - parse_cursor(timestamp) + match sort_key { + ThreadSortKey::RecencyAt => parse_cursor(&format!("{timestamp}|{}", item.item.thread_id?)), + ThreadSortKey::CreatedAt | ThreadSortKey::UpdatedAt => parse_cursor(timestamp), + } } async fn set_thread_search_result_names( diff --git a/codex-rs/thread-store/src/local/search_threads_tests.rs b/codex-rs/thread-store/src/local/search_threads_tests.rs new file mode 100644 index 000000000..bfcf76827 --- /dev/null +++ b/codex-rs/thread-store/src/local/search_threads_tests.rs @@ -0,0 +1,29 @@ +use codex_protocol::ThreadId; +use codex_rollout::ThreadItem; +use pretty_assertions::assert_eq; + +use super::ThreadSearchItem; +use super::cursor_from_thread_search_item; +use crate::ThreadSortKey; + +#[test] +fn recency_cursor_includes_thread_id_tie_breaker() { + let thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000123") + .expect("thread ID should parse"); + let item = ThreadSearchItem { + item: ThreadItem { + thread_id: Some(thread_id), + recency_at: Some("2026-01-27T12:34:56Z".to_string()), + ..Default::default() + }, + snippet: String::new(), + }; + + let cursor = cursor_from_thread_search_item(&item, ThreadSortKey::RecencyAt) + .expect("cursor should build"); + + assert_eq!( + serde_json::to_string(&cursor).expect("cursor should serialize"), + format!("\"2026-01-27T12:34:56Z|{thread_id}\"") + ); +} diff --git a/codex-rs/thread-store/src/local/unarchive_thread.rs b/codex-rs/thread-store/src/local/unarchive_thread.rs index ad41db69a..ee25b4342 100644 --- a/codex-rs/thread-store/src/local/unarchive_thread.rs +++ b/codex-rs/thread-store/src/local/unarchive_thread.rs @@ -196,5 +196,6 @@ mod tests { .expect("thread metadata should exist"); assert_eq!(updated.rollout_path, restored_path); assert_eq!(updated.archived_at, None); + assert_eq!(updated.recency_at, metadata.recency_at); } } diff --git a/codex-rs/thread-store/src/local/update_thread_metadata.rs b/codex-rs/thread-store/src/local/update_thread_metadata.rs index 820cb6891..d1c76ad32 100644 --- a/codex-rs/thread-store/src/local/update_thread_metadata.rs +++ b/codex-rs/thread-store/src/local/update_thread_metadata.rs @@ -204,6 +204,7 @@ async fn apply_metadata_update( .map_err(|err| ThreadStoreError::Internal { message: format!("failed to read thread metadata for {thread_id}: {err}"), })?; + let advance_recency_at = patch.advance_recency_at; if existing.is_none() && rollout_path.is_none() { let resolved = resolve_rollout_path(store, thread_id, include_archived).await?; rollout_path_archived = resolved.archived; @@ -260,6 +261,11 @@ async fn apply_metadata_update( if let Some(updated_at) = patch.updated_at { metadata.updated_at = updated_at; } + if existing.is_none() + && let Some(recency_at) = advance_recency_at + { + metadata.recency_at = recency_at; + } if let Some(source) = patch.source { metadata.source = enum_to_string(&source); } @@ -310,6 +316,18 @@ async fn apply_metadata_update( .map_err(|err| ThreadStoreError::Internal { message: format!("failed to update thread metadata for {thread_id}: {err}"), })?; + if existing.is_some() + && let Some(recency_at) = advance_recency_at + { + state_db + .touch_thread_recency_at(thread_id, recency_at) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!( + "failed to advance thread recency_at for {thread_id}: {err}" + ), + })?; + } if let Some(memory_mode) = patch.memory_mode { state_db .set_thread_memory_mode(thread_id, memory_mode_as_str(memory_mode)) diff --git a/codex-rs/thread-store/src/thread_metadata_sync.rs b/codex-rs/thread-store/src/thread_metadata_sync.rs index d4bd8edb7..6705f4574 100644 --- a/codex-rs/thread-store/src/thread_metadata_sync.rs +++ b/codex-rs/thread-store/src/thread_metadata_sync.rs @@ -156,7 +156,10 @@ impl ThreadMetadataSync { let affects_metadata = items .iter() .any(codex_state::rollout_item_affects_thread_metadata); - let update = if affects_metadata { + let advances_recency = items + .iter() + .any(|item| matches!(item, RolloutItem::EventMsg(EventMsg::TurnStarted(_)))); + let mut update = if affects_metadata { self.observe_items(items)? } else { Some(thread_updated_at_touch()) @@ -164,6 +167,9 @@ impl ThreadMetadataSync { let Some(update) = update else { return Ok(None); }; + if advances_recency { + update.advance_recency_at = Some(Utc::now()); + } self.merge_pending_update(Some(update)); if !affects_metadata && !self @@ -367,6 +373,7 @@ fn update_has_metadata_facts(update: &ThreadMetadataPatch) -> bool { || update.model.is_some() || update.reasoning_effort.is_some() || update.created_at.is_some() + || update.advance_recency_at.is_some() || update.source.is_some() || update.thread_source.is_some() || update.agent_nickname.is_some() @@ -399,6 +406,7 @@ mod tests { use codex_protocol::protocol::ThreadGoal; use codex_protocol::protocol::ThreadGoalStatus; use codex_protocol::protocol::ThreadGoalUpdatedEvent; + use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; use pretty_assertions::assert_eq; @@ -522,6 +530,27 @@ mod tests { ); } + #[test] + fn turn_start_advances_recency_at_without_changing_updated_at_behavior() { + let thread_id = ThreadId::new(); + let mut sync = ThreadMetadataSync::for_resume(&resume_params(thread_id, Vec::new())); + + let update = sync + .observe_appended_items(&[RolloutItem::EventMsg(EventMsg::TurnStarted( + TurnStartedEvent { + turn_id: "turn-1".to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + }, + ))]) + .expect("turn start metadata update"); + + assert!(update.patch.updated_at.is_some()); + assert!(update.patch.advance_recency_at.is_some()); + } + #[test] fn resume_history_waits_for_append_before_flushing_metadata() { let thread_id = ThreadId::new(); diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index bb8bf2f6d..b969c6b95 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -160,6 +160,8 @@ pub enum ThreadSortKey { CreatedAt, /// Sort by the thread last-update timestamp. UpdatedAt, + /// Sort by the thread's product recency timestamp. + RecencyAt, } /// The direction to use when listing stored threads. @@ -397,6 +399,8 @@ pub struct StoredThread { pub created_at: DateTime, /// Thread last-update timestamp. pub updated_at: DateTime, + /// Thread product-recency timestamp. + pub recency_at: DateTime, /// Thread archive timestamp, if archived. pub archived_at: Option>, /// Working directory captured for the thread. @@ -504,6 +508,8 @@ pub struct ThreadMetadataPatch { pub created_at: Option>, /// Last update timestamp for this metadata observation. pub updated_at: Option>, + /// Advance product recency to at least this timestamp. + pub advance_recency_at: Option>, /// Session source. pub source: Option, /// Optional analytics source classification. @@ -586,6 +592,9 @@ impl ThreadMetadataPatch { if next.updated_at.is_some() { self.updated_at = next.updated_at; } + if next.advance_recency_at.is_some() { + self.advance_recency_at = next.advance_recency_at; + } if next.source.is_some() { self.source = next.source; } @@ -639,6 +648,7 @@ impl ThreadMetadataPatch { && self.reasoning_effort.is_none() && self.created_at.is_none() && self.updated_at.is_none() + && self.advance_recency_at.is_none() && self.source.is_none() && self.thread_source.is_none() && self.agent_nickname.is_none() diff --git a/codex-rs/tui/src/app/loaded_threads.rs b/codex-rs/tui/src/app/loaded_threads.rs index aba08f08d..6bd260c0b 100644 --- a/codex-rs/tui/src/app/loaded_threads.rs +++ b/codex-rs/tui/src/app/loaded_threads.rs @@ -136,6 +136,7 @@ mod tests { model_provider: "openai".to_string(), created_at: 0, updated_at: 0, + recency_at: Some(0), status: ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp").abs(), diff --git a/codex-rs/tui/src/app/tests.rs b/codex-rs/tui/src/app/tests.rs index 87538251a..d8a29d4ee 100644 --- a/codex-rs/tui/src/app/tests.rs +++ b/codex-rs/tui/src/app/tests.rs @@ -2881,6 +2881,7 @@ async fn inactive_thread_started_notification_initializes_replay_session() -> Re model_provider: "agent-provider".to_string(), created_at: 1, updated_at: 2, + recency_at: Some(2), status: codex_app_server_protocol::ThreadStatus::Idle, path: Some(rollout_path.clone()), cwd: test_path_buf("/tmp/agent").abs(), @@ -2973,6 +2974,7 @@ async fn inactive_thread_started_notification_preserves_primary_model_when_path_ model_provider: "agent-provider".to_string(), created_at: 1, updated_at: 2, + recency_at: Some(2), status: codex_app_server_protocol::ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp/agent").abs(), @@ -3032,6 +3034,7 @@ async fn thread_read_session_state_does_not_reuse_primary_permission_profile() { model_provider: "read-provider".to_string(), created_at: 1, updated_at: 2, + recency_at: Some(2), status: codex_app_server_protocol::ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp/read").abs(), @@ -5643,6 +5646,7 @@ async fn thread_rollback_response_discards_queued_active_thread_events() { model_provider: "openai".to_string(), created_at: 0, updated_at: 0, + recency_at: Some(0), status: codex_app_server_protocol::ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp/project").abs(), diff --git a/codex-rs/tui/src/app/thread_session_state.rs b/codex-rs/tui/src/app/thread_session_state.rs index bcc172ef9..ceafb71db 100644 --- a/codex-rs/tui/src/app/thread_session_state.rs +++ b/codex-rs/tui/src/app/thread_session_state.rs @@ -417,6 +417,7 @@ mod tests { model_provider: "read-provider".to_string(), created_at: 1, updated_at: 2, + recency_at: Some(2), status: codex_app_server_protocol::ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp/read").abs(), diff --git a/codex-rs/tui/src/app_server_session.rs b/codex-rs/tui/src/app_server_session.rs index 5e36fb1c9..4049ca14a 100644 --- a/codex-rs/tui/src/app_server_session.rs +++ b/codex-rs/tui/src/app_server_session.rs @@ -2319,6 +2319,7 @@ mod tests { model_provider: "openai".to_string(), created_at: 1, updated_at: 2, + recency_at: Some(2), status: ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp/project").abs(), diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index 18313bcca..08191eaf2 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -608,7 +608,7 @@ fn spawn_app_server_page_loader( fn sort_key_label(sort_key: ThreadSortKey) -> &'static str { match sort_key { ThreadSortKey::CreatedAt => "Created", - ThreadSortKey::UpdatedAt => "Updated", + ThreadSortKey::UpdatedAt | ThreadSortKey::RecencyAt => "Updated", } } @@ -1614,7 +1614,7 @@ impl PickerState { fn toggle_sort_key(&mut self) { self.sort_key = match self.sort_key { ThreadSortKey::CreatedAt => ThreadSortKey::UpdatedAt, - ThreadSortKey::UpdatedAt => ThreadSortKey::CreatedAt, + ThreadSortKey::UpdatedAt | ThreadSortKey::RecencyAt => ThreadSortKey::CreatedAt, }; self.start_initial_load(); } @@ -2613,7 +2613,7 @@ fn render_dense_session_lines( let updated = format_relative_time(reference, row.updated_at.or(row.created_at)); let date = match state.sort_key { ThreadSortKey::CreatedAt => created, - ThreadSortKey::UpdatedAt => updated, + ThreadSortKey::UpdatedAt | ThreadSortKey::RecencyAt => updated, }; let mut lines = vec![dense_summary_line(DenseSummaryInput { marker, @@ -2742,7 +2742,7 @@ fn render_footer_lines( ) -> Vec> { let date = match sort_key { ThreadSortKey::CreatedAt => created, - ThreadSortKey::UpdatedAt => updated, + ThreadSortKey::UpdatedAt | ThreadSortKey::RecencyAt => updated, }; let mut parts = vec![FooterPart::Date(date.to_string())]; if show_cwd { @@ -5728,6 +5728,7 @@ session_picker_view = "dense" model_provider: String::from("openai"), created_at: 1, updated_at: 2, + recency_at: Some(2), status: codex_app_server_protocol::ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp").abs(), @@ -5763,6 +5764,7 @@ session_picker_view = "dense" model_provider: String::from("openai"), created_at: 1, updated_at: 2, + recency_at: Some(2), status: codex_app_server_protocol::ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp").abs(), @@ -5832,6 +5834,7 @@ session_picker_view = "dense" model_provider: String::from("openai"), created_at: 1, updated_at: 2, + recency_at: Some(2), status: codex_app_server_protocol::ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp").abs(), @@ -5890,6 +5893,7 @@ session_picker_view = "dense" model_provider: String::from("openai"), created_at: 1, updated_at: 2, + recency_at: Some(2), status: codex_app_server_protocol::ThreadStatus::Idle, path: None, cwd: test_path_buf("/tmp").abs(),