From 5267e805fb830891c0b23376bcd9cbd382c3473c Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Fri, 26 Jun 2026 09:12:42 -0700 Subject: [PATCH] feat(app-server): add history_mode to thread (#29927) ## Description This PR adds a new `historyMode = "legacy" | "paginated"` to `Thread`. This will be stored in `SessionMeta` in the JSONL rollout file and as a new column in the SQLite thread_metadata table, and exposed on `thread/start` and on the `Thread` object in app-server. ## What changed - Added canonical `ThreadHistoryMode` with `legacy` and `paginated`, defaulting old and new SessionMeta to `legacy`. - Carried `history_mode` through core session config, ThreadStore stored metadata, local/in-memory stores, rollout metadata extraction, and the existing SQLite `threads` table. - Added experimental `historyMode` to app-server v2 `Thread` and `thread/start`. - Made paginated stored threads metadata-discoverable but unsupported for legacy full-history reads, `load_history`, live resume, and create paths. - Regenerated app-server schema fixtures and added protocol/state/thread-store/app-server coverage for persistence and fail-closed behavior. ## Compatibility floor Because users may be running various versions of Codex binaries on the same machine (TUI, Codex App, etc.), we will need to establish a compatibility floor for upcoming paginated threads, which will change how thread storage reads and writes work. The overall plan here: ``` Release N: - Add historyMode to SessionMeta / Thread / SQLite metadata. - Teach binaries to understand paginated threads. - If a binary sees `historyMode="paginated"` but does not support the paginated contract, it refuses to resume/mutate the thread. - Default remains `"legacy"`. Release N+1: - First-party clients start opting into paginated threads where appropriate. - Internal dogfood / staged rollout. - Measure old-client usage and paginated-thread unsupported errors. Release N+2: - Only after Release N+ is overwhelmingly deployed, make paginated the default. - Accept that a small tail of N-1-or-older binaries may not understand paginated threads. ``` The important behavior change is fail-closed handling for a binary that encounters a persisted `paginated` thread before it knows how to fully support paginated history. In app-server, if a thread is `paginated`, we will: - allow metadata-only discovery paths like `thread/list` and `thread/read(includeTurns=false)`, so clients can still see the thread and inspect its `historyMode` - reject legacy full-history/live-thread paths like `thread/read(includeTurns=true)` and `thread/resume` with an unsupported JSON-RPC error - avoid silently treating an unknown or future `historyMode` as `legacy` Under the hood, the ThreadStore layer also rejects legacy operations that would need to load or replay the full thread history for a paginated thread. That gives us the behavior we want for Release N: future paginated threads are visible, but this binary fails closed instead of trying to operate on them as if they were legacy threads. --- .../analytics/src/analytics_client_tests.rs | 1 + codex-rs/analytics/src/client_tests.rs | 1 + .../schema/json/ClientRequest.json | 7 + .../schema/json/ServerNotification.json | 7 + .../codex_app_server_protocol.schemas.json | 7 + .../codex_app_server_protocol.v2.schemas.json | 7 + .../schema/json/v2/ThreadForkResponse.json | 7 + .../schema/json/v2/ThreadListResponse.json | 7 + .../json/v2/ThreadMetadataUpdateResponse.json | 7 + .../schema/json/v2/ThreadReadResponse.json | 7 + .../schema/json/v2/ThreadResumeResponse.json | 7 + .../json/v2/ThreadRollbackResponse.json | 7 + .../schema/json/v2/ThreadStartParams.json | 7 + .../schema/json/v2/ThreadStartResponse.json | 7 + .../json/v2/ThreadStartedNotification.json | 7 + .../json/v2/ThreadUnarchiveResponse.json | 7 + .../schema/typescript/v2/ThreadHistoryMode.ts | 5 + .../schema/typescript/v2/index.ts | 1 + .../src/protocol/common.rs | 2 + .../src/protocol/v2/tests.rs | 1 + .../src/protocol/v2/thread.rs | 5 + .../src/protocol/v2/thread_data.rs | 32 +++ .../app-server/src/bespoke_event_handling.rs | 1 + codex-rs/app-server/src/request_processors.rs | 2 + .../external_agent_session_import.rs | 2 + .../request_processors/thread_processor.rs | 14 ++ .../thread_processor_tests.rs | 2 + .../thread_resume_redaction.rs | 1 + .../src/request_processors/thread_summary.rs | 1 + codex-rs/app-server/src/thread_status.rs | 1 + codex-rs/app-server/tests/common/rollout.rs | 2 + .../tests/suite/conversation_summary.rs | 1 + .../tests/suite/v2/remote_thread_store.rs | 1 + .../app-server/tests/suite/v2/skills_list.rs | 1 + .../app-server/tests/suite/v2/thread_read.rs | 137 ++++++++++++ .../tests/suite/v2/thread_resume.rs | 1 + .../app-server/tests/suite/v2/thread_start.rs | 50 +++++ .../tests/suite/v2/thread_unarchive.rs | 1 + codex-rs/core/src/agent/control_tests.rs | 2 + codex-rs/core/src/codex_delegate.rs | 1 + codex-rs/core/src/codex_thread.rs | 2 + .../core/src/personality_migration_tests.rs | 1 + codex-rs/core/src/realtime_context_tests.rs | 1 + codex-rs/core/src/session/mod.rs | 7 + codex-rs/core/src/session/session.rs | 5 + codex-rs/core/src/session/tests.rs | 10 + .../core/src/session/tests/guardian_tests.rs | 1 + .../core/src/session_rollout_init_error.rs | 8 + codex-rs/core/src/thread_manager.rs | 12 + codex-rs/core/src/thread_manager_tests.rs | 6 + codex-rs/core/tests/common/test_codex.rs | 1 + codex-rs/core/tests/suite/agents_md.rs | 2 + .../core/tests/suite/personality_migration.rs | 2 + codex-rs/core/tests/suite/sqlite_state.rs | 1 + .../tests/suite/subagent_notifications.rs | 1 + codex-rs/exec/src/lib_tests.rs | 2 + codex-rs/memories/write/src/runtime.rs | 1 + codex-rs/protocol/src/protocol.rs | 109 ++++++++++ codex-rs/rollout/src/compression_tests.rs | 1 + codex-rs/rollout/src/list.rs | 23 +- codex-rs/rollout/src/metadata.rs | 1 + codex-rs/rollout/src/metadata_tests.rs | 39 ++++ codex-rs/rollout/src/recorder.rs | 38 ++++ codex-rs/rollout/src/recorder_tests.rs | 43 ++++ codex-rs/rollout/src/session_index_tests.rs | 1 + codex-rs/rollout/src/state_db_tests.rs | 1 + codex-rs/rollout/src/tests.rs | 47 ++++ .../migrations/0040_threads_history_mode.sql | 1 + codex-rs/state/src/extract.rs | 4 + codex-rs/state/src/model/thread_metadata.rs | 23 ++ codex-rs/state/src/runtime/memories.rs | 25 ++- codex-rs/state/src/runtime/test_support.rs | 3 + codex-rs/state/src/runtime/threads.rs | 38 +++- codex-rs/thread-store/src/error.rs | 12 + codex-rs/thread-store/src/in_memory.rs | 205 +++++++++++++++++- .../thread-store/src/local/create_thread.rs | 3 + codex-rs/thread-store/src/local/helpers.rs | 1 + .../thread-store/src/local/list_threads.rs | 2 + .../thread-store/src/local/live_writer.rs | 27 ++- codex-rs/thread-store/src/local/mod.rs | 104 ++++++++- .../thread-store/src/local/read_thread.rs | 86 ++++++-- .../thread-store/src/local/test_support.rs | 17 ++ .../src/local/update_thread_metadata.rs | 71 ++++++ codex-rs/thread-store/src/store.rs | 9 + .../thread-store/src/thread_metadata_sync.rs | 3 + codex-rs/thread-store/src/types.rs | 46 ++++ codex-rs/tui/src/app/loaded_threads.rs | 1 + codex-rs/tui/src/app/tests.rs | 4 + codex-rs/tui/src/app/thread_session_state.rs | 1 + codex-rs/tui/src/app_server_session.rs | 1 + codex-rs/tui/src/resume_picker.rs | 4 + 91 files changed, 1385 insertions(+), 39 deletions(-) create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/ThreadHistoryMode.ts create mode 100644 codex-rs/state/migrations/0040_threads_history_mode.sql diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 4451d39ed..0f759fd93 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -198,6 +198,7 @@ fn sample_thread_with_metadata( parent_thread_id, preview: "first prompt".to_string(), ephemeral, + history_mode: Default::default(), model_provider: "openai".to_string(), created_at: 1, updated_at: 2, diff --git a/codex-rs/analytics/src/client_tests.rs b/codex-rs/analytics/src/client_tests.rs index bbc5b5f0b..ac50d6ebc 100644 --- a/codex-rs/analytics/src/client_tests.rs +++ b/codex-rs/analytics/src/client_tests.rs @@ -283,6 +283,7 @@ fn sample_thread(thread_id: &str) -> Thread { parent_thread_id: None, preview: "first prompt".to_string(), ephemeral: false, + history_mode: Default::default(), model_provider: "openai".to_string(), created_at: 1, updated_at: 2, diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index acfffc370..8c1d700ef 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3747,6 +3747,13 @@ ], "type": "string" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadInjectItemsParams": { "properties": { "items": { diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 43fb66182..23dae42a9 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -3917,6 +3917,13 @@ ], "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 277421a85..f1d6a1af8 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -17512,6 +17512,13 @@ "title": "ThreadGoalUpdatedNotification", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 39c95053d..9983b7925 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -15291,6 +15291,13 @@ "title": "ThreadGoalUpdatedNotification", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json index 3ca62f17f..5d20d64e6 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json @@ -1188,6 +1188,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json index a81f64312..e5cbc2e60 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json @@ -995,6 +995,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json index d007b32d2..cdd1d47a6 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json @@ -995,6 +995,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json index f45efe370..613dd9648 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json @@ -995,6 +995,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json index 09dd3fedb..ee1ca8391 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json @@ -1188,6 +1188,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json index 8083b52d4..793f11c22 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json @@ -995,6 +995,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json index 50c0a9cf7..3a23d13d2 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartParams.json @@ -241,6 +241,13 @@ ], "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadSource": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json index b33a64a20..9d1659869 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json @@ -1188,6 +1188,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json index 9040457f2..a56d59c01 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json @@ -995,6 +995,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json index d4e9fea61..6cc9b44cb 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json @@ -995,6 +995,13 @@ "description": "Extra app-server data for a thread.", "type": "object" }, + "ThreadHistoryMode": { + "enum": [ + "legacy", + "paginated" + ], + "type": "string" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadHistoryMode.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadHistoryMode.ts new file mode 100644 index 000000000..db0f2d824 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadHistoryMode.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadHistoryMode = "legacy" | "paginated"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index 82b74711c..2833e6e52 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -410,6 +410,7 @@ export type { ThreadGoalSetParams } from "./ThreadGoalSetParams"; export type { ThreadGoalSetResponse } from "./ThreadGoalSetResponse"; export type { ThreadGoalStatus } from "./ThreadGoalStatus"; export type { ThreadGoalUpdatedNotification } from "./ThreadGoalUpdatedNotification"; +export type { ThreadHistoryMode } from "./ThreadHistoryMode"; export type { ThreadInjectItemsParams } from "./ThreadInjectItemsParams"; export type { ThreadInjectItemsResponse } from "./ThreadInjectItemsResponse"; export type { ThreadItem } from "./ThreadItem"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 3120007e5..7b1503557 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -2580,6 +2580,7 @@ mod tests { parent_thread_id: None, preview: "first prompt".to_string(), ephemeral: true, + history_mode: Default::default(), model_provider: "openai".to_string(), created_at: 1, updated_at: 2, @@ -2630,6 +2631,7 @@ mod tests { "parentThreadId": null, "preview": "first prompt", "ephemeral": true, + "historyMode": "legacy", "modelProvider": "openai", "createdAt": 1, "updatedAt": 2, diff --git a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs index c6d460c72..ddd2afe8f 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs @@ -175,6 +175,7 @@ fn thread_resume_response_round_trips_initial_turns_page() { parent_thread_id: None, preview: String::new(), ephemeral: false, + history_mode: Default::default(), model_provider: "openai".to_string(), created_at: 1, updated_at: 1, diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index e712ab0ad..73335fce5 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -4,6 +4,7 @@ use super::AskForApproval; use super::SandboxMode; use super::SandboxPolicy; use super::Thread; +use super::ThreadHistoryMode; use super::ThreadItem; use super::ThreadSource; use super::Turn; @@ -105,6 +106,10 @@ pub struct ThreadStartParams { pub multi_agent_mode: Option, #[ts(optional = nullable)] pub ephemeral: Option, + /// Persisted thread history contract to use for this new thread. + #[experimental("thread/start.historyMode")] + #[ts(optional = nullable)] + pub history_mode: Option, #[ts(optional = nullable)] pub session_start_source: Option, /// Optional client-supplied analytics source classification for this thread. diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs index 3338513df..2393efb25 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread_data.rs @@ -5,6 +5,7 @@ use super::TurnStatus; use codex_experimental_api_macros::ExperimentalApi; use codex_protocol::protocol::SessionSource as CoreSessionSource; use codex_protocol::protocol::SubAgentSource as CoreSubAgentSource; +use codex_protocol::protocol::ThreadHistoryMode as CoreThreadHistoryMode; use codex_protocol::protocol::ThreadSource as CoreThreadSource; use codex_utils_absolute_path::AbsolutePathBuf; use schemars::JsonSchema; @@ -64,6 +65,33 @@ impl From for CoreSessionSource { } } +#[derive(Default, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "lowercase")] +#[ts(rename_all = "lowercase", export_to = "v2/")] +pub enum ThreadHistoryMode { + #[default] + Legacy, + Paginated, +} + +impl From for ThreadHistoryMode { + fn from(value: CoreThreadHistoryMode) -> Self { + match value { + CoreThreadHistoryMode::Legacy => Self::Legacy, + CoreThreadHistoryMode::Paginated => Self::Paginated, + } + } +} + +impl From for CoreThreadHistoryMode { + fn from(value: ThreadHistoryMode) -> Self { + match value { + ThreadHistoryMode::Legacy => Self::Legacy, + ThreadHistoryMode::Paginated => Self::Paginated, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, TS)] #[serde(try_from = "String", into = "String")] #[ts(type = "string")] @@ -155,6 +183,10 @@ pub struct Thread { pub preview: String, /// Whether the thread is ephemeral and should not be materialized on disk. pub ephemeral: bool, + /// Persisted thread history contract selected when this thread was created. + #[experimental("thread.historyMode")] + #[serde(default)] + pub history_mode: ThreadHistoryMode, /// Model provider used for this thread (for example, 'openai'). pub model_provider: String, /// Unix timestamp (in seconds) when the thread was created. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index bcba603bc..c63103161 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -2287,6 +2287,7 @@ mod tests { cwd: test_path_buf("/tmp").abs().into(), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + history_mode: Default::default(), thread_source: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 57a65fd31..39c4b79dc 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -206,6 +206,8 @@ use codex_app_server_protocol::ThreadGoalSetResponse; use codex_app_server_protocol::ThreadGoalStatus; use codex_app_server_protocol::ThreadGoalUpdatedNotification; use codex_app_server_protocol::ThreadHistoryBuilder; +#[cfg(test)] +use codex_app_server_protocol::ThreadHistoryMode; use codex_app_server_protocol::ThreadIncrementElicitationParams; use codex_app_server_protocol::ThreadIncrementElicitationResponse; use codex_app_server_protocol::ThreadInjectItemsParams; diff --git a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs index 15b6e3a43..e0a53a9c9 100644 --- a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs +++ b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs @@ -15,6 +15,7 @@ use codex_models_manager::manager::RefreshStrategy; use codex_protocol::ThreadId; use codex_protocol::models::BaseInstructions; use codex_protocol::protocol::MultiAgentVersion; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadMemoryMode; use codex_rollout::is_persisted_rollout_item; use codex_thread_store::AppendThreadItemsParams; @@ -221,6 +222,7 @@ impl ExternalAgentSessionImporter { dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: Some(MultiAgentVersion::V1), + history_mode: ThreadHistoryMode::Legacy, initial_window_id: uuid::Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { cwd: Some(cwd.clone()), diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 8232d68fe..d5302d9c4 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -5,6 +5,7 @@ use codex_extension_api::ExtensionDataInit; use codex_protocol::config_types::MultiAgentMode; use codex_protocol::models::BUILT_IN_PERMISSION_PROFILE_DANGER_FULL_ACCESS; use codex_protocol::models::BUILT_IN_PERMISSION_PROFILE_WORKSPACE; +use codex_protocol::protocol::ThreadHistoryMode; const THREAD_LIST_DEFAULT_LIMIT: usize = 25; const THREAD_LIST_MAX_LIMIT: usize = 100; @@ -926,6 +927,7 @@ impl ThreadRequestProcessor { personality, multi_agent_mode: _multi_agent_mode, ephemeral, + history_mode, session_start_source, thread_source, environments, @@ -980,6 +982,7 @@ impl ThreadRequestProcessor { typesafe_overrides, dynamic_tools, selected_capability_roots.unwrap_or_default(), + history_mode.map(Into::into), session_start_source, thread_source.map(Into::into), environment_selections, @@ -1055,6 +1058,7 @@ impl ThreadRequestProcessor { typesafe_overrides: ConfigOverrides, dynamic_tools: Option>, selected_capability_roots: Vec, + history_mode: Option, session_start_source: Option, thread_source: Option, environments: Option>, @@ -1173,6 +1177,7 @@ impl ThreadRequestProcessor { codex_app_server_protocol::ThreadStartSource::Startup => InitialHistory::New, codex_app_server_protocol::ThreadStartSource::Clear => InitialHistory::Cleared, }, + history_mode, session_source: None, thread_source, dynamic_tools, @@ -1190,6 +1195,7 @@ impl ThreadRequestProcessor { .await .map_err(|err| match err { CodexErr::InvalidRequest(message) => invalid_request(message), + CodexErr::UnsupportedOperation(message) => method_not_found(message), err => internal_error(format!("error creating thread: {err}")), })?; let session_telemetry = thread.session_telemetry(); @@ -2299,6 +2305,9 @@ impl ThreadRequestProcessor { Err(ThreadStoreError::InvalidRequest { message }) => { Err(ThreadReadViewError::InvalidRequest(message)) } + Err(ThreadStoreError::Unsupported { operation }) => { + Err(ThreadReadViewError::Unsupported(operation)) + } Err(err) => Err(ThreadReadViewError::Internal(format!( "failed to read thread: {err}" ))), @@ -2501,6 +2510,9 @@ impl ThreadRequestProcessor { Err(ThreadStoreError::InvalidRequest { message }) => { return Err(ThreadReadViewError::InvalidRequest(message)); } + Err(ThreadStoreError::Unsupported { operation }) => { + return Err(ThreadReadViewError::Unsupported(operation)); + } Err(err) => { return Err(ThreadReadViewError::Internal(format!( "failed to read thread: {err}" @@ -4254,6 +4266,7 @@ pub(crate) fn thread_from_stored_thread( parent_thread_id: thread.parent_thread_id.map(|id| id.to_string()), preview: thread.preview, ephemeral: false, + history_mode: thread.history_mode.into(), model_provider: if thread.model_provider.is_empty() { fallback_provider.to_string() } else { @@ -4465,6 +4478,7 @@ fn build_thread_from_snapshot( parent_thread_id: config_snapshot.parent_thread_id.map(|id| id.to_string()), preview: String::new(), ephemeral: config_snapshot.ephemeral, + history_mode: config_snapshot.history_mode.into(), model_provider: config_snapshot.model_provider_id.clone(), created_at: now, updated_at: now, diff --git a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs index 0d447011b..8c0d59efe 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs @@ -483,6 +483,7 @@ mod thread_processor_behavior_tests { cwd: PathBuf::from("/tmp"), cli_version: "0.0.0".to_string(), source: SessionSource::Cli, + history_mode: Default::default(), thread_source: Some(codex_protocol::protocol::ThreadSource::User), agent_nickname: None, agent_role: None, @@ -777,6 +778,7 @@ mod thread_processor_behavior_tests { }, }, session_source: SessionSource::Cli, + history_mode: Default::default(), forked_from_thread_id: None, parent_thread_id: None, thread_source: None, diff --git a/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs b/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs index f75360727..90d25ff59 100644 --- a/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs +++ b/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs @@ -195,6 +195,7 @@ mod tests { parent_thread_id: None, preview: "preview".to_string(), ephemeral: false, + history_mode: Default::default(), model_provider: "mock_provider".to_string(), created_at: 0, updated_at: 0, diff --git a/codex-rs/app-server/src/request_processors/thread_summary.rs b/codex-rs/app-server/src/request_processors/thread_summary.rs index 2bacaf644..37c77a198 100644 --- a/codex-rs/app-server/src/request_processors/thread_summary.rs +++ b/codex-rs/app-server/src/request_processors/thread_summary.rs @@ -320,6 +320,7 @@ pub(crate) fn summary_to_thread( parent_thread_id: None, preview, ephemeral: false, + history_mode: ThreadHistoryMode::Legacy, model_provider, created_at: created_at.map(|dt| dt.timestamp()).unwrap_or(0), updated_at: updated_at.map(|dt| dt.timestamp()).unwrap_or(0), diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index 32b79e1f6..a29b64978 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -895,6 +895,7 @@ mod tests { parent_thread_id: None, preview: String::new(), ephemeral: false, + history_mode: Default::default(), model_provider: "mock-provider".to_string(), created_at: 0, updated_at: 0, diff --git a/codex-rs/app-server/tests/common/rollout.rs b/codex-rs/app-server/tests/common/rollout.rs index 8d2567c01..c13f2355d 100644 --- a/codex-rs/app-server/tests/common/rollout.rs +++ b/codex-rs/app-server/tests/common/rollout.rs @@ -202,6 +202,7 @@ fn create_fake_rollout_with_source_and_parent_thread_id( dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }; @@ -291,6 +292,7 @@ pub fn create_fake_rollout_with_text_elements( dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }; diff --git a/codex-rs/app-server/tests/suite/conversation_summary.rs b/codex-rs/app-server/tests/suite/conversation_summary.rs index 63ad47326..64a0e6224 100644 --- a/codex-rs/app-server/tests/suite/conversation_summary.rs +++ b/codex-rs/app-server/tests/suite/conversation_summary.rs @@ -133,6 +133,7 @@ async fn get_conversation_summary_by_thread_id_reads_pathless_store_thread() -> dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: Default::default(), initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { cwd: None, diff --git a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs index 7e0db1b5b..2002b08a8 100644 --- a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs +++ b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs @@ -160,6 +160,7 @@ async fn thread_delete_with_non_local_thread_store_does_not_create_local_persist dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: Default::default(), initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { cwd: Some(codex_home.path().to_path_buf()), diff --git a/codex-rs/app-server/tests/suite/v2/skills_list.rs b/codex-rs/app-server/tests/suite/v2/skills_list.rs index 64b7fe050..f0bee7f27 100644 --- a/codex-rs/app-server/tests/suite/v2/skills_list.rs +++ b/codex-rs/app-server/tests/suite/v2/skills_list.rs @@ -897,6 +897,7 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<( personality: None, multi_agent_mode: None, ephemeral: None, + history_mode: None, session_start_source: None, thread_source: None, dynamic_tools: None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 9f6776175..22adba37c 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -18,6 +18,7 @@ use codex_app_server_protocol::SessionSource; use codex_app_server_protocol::SortDirection; use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadForkResponse; +use codex_app_server_protocol::ThreadHistoryMode; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadItemsListParams; use codex_app_server_protocol::ThreadListParams; @@ -196,6 +197,121 @@ async fn thread_read_can_include_turns() -> Result<()> { Ok(()) } +#[tokio::test] +async fn paginated_stored_thread_allows_metadata_discovery_and_rejects_legacy_history_paths() +-> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout_with_text_elements( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Vec::new(), + Some("mock_provider"), + /*git_info*/ None, + )?; + set_rollout_history_mode( + rollout_path(codex_home.path(), "2025-01-05T12-00-00", &conversation_id).as_path(), + ThreadHistoryMode::Paginated, + )?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let read_id = mcp + .send_thread_read_request(ThreadReadParams { + thread_id: conversation_id.clone(), + include_turns: false, + }) + .await?; + let read_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(read_id)), + ) + .await??; + let ThreadReadResponse { thread } = to_response::(read_resp)?; + assert_eq!(thread.history_mode, ThreadHistoryMode::Paginated); + assert!(thread.turns.is_empty()); + + let list_id = mcp + .send_thread_list_request(ThreadListParams { + cursor: None, + limit: Some(50), + sort_key: None, + sort_direction: None, + model_providers: Some(vec!["mock_provider".to_string()]), + source_kinds: None, + archived: None, + cwd: None, + use_state_db_only: false, + search_term: None, + parent_thread_id: None, + ancestor_thread_id: None, + }) + .await?; + let list_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(list_id)), + ) + .await??; + let ThreadListResponse { data, .. } = to_response::(list_resp)?; + let listed = data + .iter() + .find(|thread| thread.id == conversation_id) + .expect("thread/list should include paginated thread"); + assert_eq!(listed.history_mode, ThreadHistoryMode::Paginated); + + let read_id = mcp + .send_thread_read_request(ThreadReadParams { + thread_id: conversation_id.clone(), + include_turns: true, + }) + .await?; + assert_paginated_threads_unsupported( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(read_id)), + ) + .await??, + ); + + let turns_list_id = mcp + .send_thread_turns_list_request(ThreadTurnsListParams { + thread_id: conversation_id.clone(), + cursor: None, + limit: None, + sort_direction: None, + items_view: None, + }) + .await?; + assert_paginated_threads_unsupported( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(turns_list_id)), + ) + .await??, + ); + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: conversation_id, + ..Default::default() + }) + .await?; + assert_paginated_threads_unsupported( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(resume_id)), + ) + .await??, + ); + + Ok(()) +} + #[tokio::test] async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -1371,6 +1487,7 @@ async fn seed_pathless_store_thread( dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: Default::default(), initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { cwd: None, @@ -1411,6 +1528,26 @@ fn store_history_items() -> Vec { ))] } +fn set_rollout_history_mode(path: &Path, history_mode: ThreadHistoryMode) -> Result<()> { + let mut lines = std::fs::read_to_string(path)? + .lines() + .map(serde_json::from_str::) + .collect::, _>>()?; + lines[0]["payload"]["history_mode"] = serde_json::to_value(history_mode)?; + let contents = lines + .into_iter() + .map(|line| line.to_string()) + .collect::>() + .join("\n"); + std::fs::write(path, format!("{contents}\n"))?; + Ok(()) +} + +fn assert_paginated_threads_unsupported(err: JSONRPCError) { + assert_eq!(err.error.code, -32601); + assert_eq!(err.error.message, "paginated_threads is not supported yet"); +} + fn create_config_toml_with_thread_store(codex_home: &Path, store_id: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 67dd24921..a6e851f1d 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -2168,6 +2168,7 @@ stream_max_retries = 0 dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }; diff --git a/codex-rs/app-server/tests/suite/v2/thread_start.rs b/codex-rs/app-server/tests/suite/v2/thread_start.rs index ac4b255de..f717dd001 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_start.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_start.rs @@ -15,6 +15,7 @@ use codex_app_server_protocol::McpServerStatusUpdatedNotification; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SandboxMode; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ThreadHistoryMode; use codex_app_server_protocol::ThreadSource; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; @@ -248,6 +249,11 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> { Some(false), "new persistent threads should serialize `ephemeral: false`" ); + assert_eq!( + thread_json.get("historyMode").and_then(Value::as_str), + Some("legacy"), + "new threads should serialize `historyMode: legacy`" + ); assert_eq!( thread_json.get("threadSource").and_then(Value::as_str), Some("user"), @@ -308,6 +314,50 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_start_history_mode_accepts_legacy_and_rejects_paginated() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?; + + let mut mcp = TestAppServer::new_with_auto_env(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_thread_start_request_with_auto_env(ThreadStartParams { + history_mode: Some(ThreadHistoryMode::Legacy), + ..Default::default() + }) + .await?; + let response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response(response)?; + + assert_eq!(thread.history_mode, ThreadHistoryMode::Legacy); + + let request_id = mcp + .send_thread_start_request_with_auto_env(ThreadStartParams { + history_mode: Some(ThreadHistoryMode::Paginated), + ..Default::default() + }) + .await?; + let error: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(request_id)), + ) + .await??; + + assert_eq!(error.error.code, -32601); + assert_eq!( + error.error.message, + "paginated_threads is not supported yet" + ); + Ok(()) +} + #[tokio::test] async fn thread_start_accepts_absolute_runtime_workspace_roots() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs index 231ede47b..07999e165 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs @@ -220,6 +220,7 @@ async fn thread_unarchive_preserves_pathless_store_metadata() -> Result<()> { dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: Default::default(), initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { cwd: None, diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index acb7af377..9863bfb7e 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -1464,6 +1464,7 @@ async fn spawn_agent_fork_last_n_turns_drops_parent_startup_prefix_when_under_li config: harness.config.clone(), allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: None, thread_source: None, dynamic_tools: Vec::new(), @@ -2253,6 +2254,7 @@ async fn spawn_thread_subagents_persist_parent_originator_across_new_and_truncat config: harness.config.clone(), allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: None, thread_source: None, dynamic_tools: Vec::new(), diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index d25af5793..a671f0dd7 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -108,6 +108,7 @@ pub(crate) async fn run_codex_thread_interactive( code_mode_session_provider: parent_session.services.code_mode_service.session_provider(), extensions: Arc::clone(&parent_session.services.extensions), conversation_history, + requested_history_mode: None, session_source: SessionSource::SubAgent(subagent_source.clone()), forked_from_thread_id, parent_thread_id: Some(parent_session.thread_id), diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 90555db91..4a562092f 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -29,6 +29,7 @@ use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::Submission; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TokenUsageInfo; @@ -72,6 +73,7 @@ pub struct ThreadConfigSnapshot { pub personality: Option, pub collaboration_mode: CollaborationMode, pub session_source: SessionSource, + pub history_mode: ThreadHistoryMode, pub forked_from_thread_id: Option, pub parent_thread_id: Option, pub thread_source: Option, diff --git a/codex-rs/core/src/personality_migration_tests.rs b/codex-rs/core/src/personality_migration_tests.rs index 396d06564..80f5b3c02 100644 --- a/codex-rs/core/src/personality_migration_tests.rs +++ b/codex-rs/core/src/personality_migration_tests.rs @@ -61,6 +61,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, diff --git a/codex-rs/core/src/realtime_context_tests.rs b/codex-rs/core/src/realtime_context_tests.rs index e74d5b3e0..23f0b0c03 100644 --- a/codex-rs/core/src/realtime_context_tests.rs +++ b/codex-rs/core/src/realtime_context_tests.rs @@ -55,6 +55,7 @@ fn stored_thread(cwd: &str, title: &str, first_user_message: &str) -> StoredThre cwd: PathBuf::from(cwd), cli_version: "test".to_string(), source: SessionSource::Cli, + history_mode: Default::default(), thread_source: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 6a21a6be6..f45616eed 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -122,6 +122,7 @@ use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnContextItem; @@ -419,6 +420,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) code_mode_session_provider: Arc, pub(crate) extensions: Arc>, pub(crate) conversation_history: InitialHistory, + pub(crate) requested_history_mode: Option, pub(crate) session_source: SessionSource, pub(crate) forked_from_thread_id: Option, pub(crate) parent_thread_id: Option, @@ -510,6 +512,7 @@ impl Codex { code_mode_session_provider, extensions, conversation_history, + requested_history_mode, session_source, forked_from_thread_id, parent_thread_id, @@ -600,6 +603,9 @@ impl Codex { .await; let multi_agent_version = resolve_multi_agent_version(&conversation_history, inherited_multi_agent_version); + let history_mode = conversation_history.get_history_mode( + requested_history_mode.unwrap_or_else(|| thread_store.default_history_mode()), + ); config .validate_multi_agent_v2_config() .map_err(|err| CodexErr::InvalidRequest(err.to_string()))?; @@ -655,6 +661,7 @@ impl Codex { app_server_client_name: None, app_server_client_version: None, session_source, + history_mode, forked_from_thread_id, parent_thread_id, thread_source, diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index a9cc89db3..5e7046b14 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -16,6 +16,7 @@ use codex_protocol::config_types::ServiceTier; use codex_protocol::permissions::FileSystemPath; use codex_protocol::permissions::FileSystemSpecialPath; use codex_protocol::protocol::MultiAgentVersion; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnEnvironmentSelections; use std::sync::OnceLock; @@ -96,6 +97,8 @@ pub(crate) struct SessionConfiguration { pub(super) app_server_client_version: Option, /// Source of the session (cli, vscode, exec, mcp, ...) pub(super) session_source: SessionSource, + /// Persisted thread history contract selected when this thread was created. + pub(super) history_mode: ThreadHistoryMode, /// Immediate history source copied into this thread, when this thread was forked. pub(super) forked_from_thread_id: Option, /// Immediate control/spawn parent for this thread, when it has one. @@ -192,6 +195,7 @@ impl SessionConfiguration { personality: self.personality, collaboration_mode: self.collaboration_mode.clone(), session_source: self.session_source.clone(), + history_mode: self.history_mode, forked_from_thread_id: self.forked_from_thread_id, parent_thread_id: self.parent_thread_id, thread_source: self.thread_source.clone(), @@ -599,6 +603,7 @@ impl Session { dynamic_tools: session_configuration.dynamic_tools.clone(), selected_capability_roots: selected_capability_roots.clone(), multi_agent_version: initial_multi_agent_version, + history_mode: session_configuration.history_mode, initial_window_id: initial_auto_compact_window_ids .window_id .to_string(), diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 09fdcf224..93519cda9 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -3686,6 +3686,7 @@ async fn set_rate_limits_retains_previous_credits() { app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + history_mode: Default::default(), forked_from_thread_id: None, parent_thread_id: None, thread_source: None, @@ -3792,6 +3793,7 @@ async fn set_rate_limits_updates_plan_type_when_present() { app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + history_mode: Default::default(), forked_from_thread_id: None, parent_thread_id: None, thread_source: None, @@ -4050,6 +4052,7 @@ async fn attach_thread_persistence(session: &mut Session) -> PathBuf { dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: Default::default(), initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { cwd: Some(config.cwd.to_path_buf()), @@ -4323,6 +4326,7 @@ pub(crate) async fn make_session_configuration_for_tests() -> SessionConfigurati app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + history_mode: Default::default(), forked_from_thread_id: None, parent_thread_id: None, thread_source: None, @@ -5193,6 +5197,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() { app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + history_mode: Default::default(), forked_from_thread_id: None, parent_thread_id: None, thread_source: None, @@ -5324,6 +5329,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + history_mode: Default::default(), forked_from_thread_id: None, parent_thread_id: None, thread_source: None, @@ -5572,6 +5578,7 @@ async fn make_session_with_config_and_rx( app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + history_mode: Default::default(), forked_from_thread_id: None, parent_thread_id: None, thread_source: None, @@ -5679,6 +5686,7 @@ async fn make_session_with_history_source_and_agent_control_and_rx( app_server_client_name: None, app_server_client_version: None, session_source: session_source.clone(), + history_mode: Default::default(), forked_from_thread_id: None, parent_thread_id: None, thread_source: None, @@ -6931,6 +6939,7 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() { dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: Default::default(), initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { cwd: Some(config.cwd.to_path_buf()), @@ -7404,6 +7413,7 @@ where app_server_client_name: None, app_server_client_version: None, session_source: SessionSource::Exec, + history_mode: Default::default(), forked_from_thread_id: None, parent_thread_id: None, thread_source: None, diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index dc17af5a1..d13239d81 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -730,6 +730,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { code_mode_session_provider: Arc::new(codex_code_mode::InProcessCodeModeSessionProvider), extensions: codex_extension_api::empty_extension_registry(), conversation_history: InitialHistory::New, + requested_history_mode: None, session_source: SessionSource::SubAgent(SubAgentSource::Other( GUARDIAN_REVIEWER_NAME.to_string(), )), diff --git a/codex-rs/core/src/session_rollout_init_error.rs b/codex-rs/core/src/session_rollout_init_error.rs index 57e1e8e57..db71dfcd3 100644 --- a/codex-rs/core/src/session_rollout_init_error.rs +++ b/codex-rs/core/src/session_rollout_init_error.rs @@ -3,8 +3,16 @@ use std::path::Path; use crate::rollout::SESSIONS_SUBDIR; use codex_protocol::error::CodexErr; +use codex_thread_store::ThreadStoreError; pub(crate) fn map_session_init_error(err: &anyhow::Error, codex_home: &Path) -> CodexErr { + if let Some(ThreadStoreError::Unsupported { operation }) = err + .chain() + .find_map(|cause| cause.downcast_ref::()) + { + return CodexErr::UnsupportedOperation(format!("{operation} is not supported yet")); + } + if let Some(mapped) = err .chain() .filter_map(|cause| cause.downcast_ref::()) diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 2ca21c15a..4ea86cb33 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -56,6 +56,7 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; @@ -187,6 +188,7 @@ pub struct StartThreadOptions { pub config: Config, pub allow_provider_model_fallback: bool, pub initial_history: InitialHistory, + pub history_mode: Option, pub session_source: Option, pub thread_source: Option, pub dynamic_tools: Vec, @@ -647,6 +649,7 @@ impl ThreadManager { config, allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: None, thread_source: None, dynamic_tools, @@ -682,6 +685,7 @@ impl ThreadManager { Box::pin(self.state.spawn_thread_with_source( options.config, options.initial_history, + options.history_mode, options.allow_provider_model_fallback, Arc::clone(&self.state.auth_manager), agent_control, @@ -778,6 +782,7 @@ impl ThreadManager { Box::pin(self.state.spawn_thread_with_source( config, initial_history, + /*history_mode*/ None, /*allow_provider_model_fallback*/ false, auth_manager, agent_control, @@ -848,6 +853,7 @@ impl ThreadManager { Box::pin(self.state.spawn_thread_with_source( config, initial_history, + /*history_mode*/ None, /*allow_provider_model_fallback*/ false, auth_manager, agent_control, @@ -1349,6 +1355,7 @@ impl ThreadManagerState { Box::pin(self.spawn_thread_with_source( config, InitialHistory::New, + /*history_mode*/ None, /*allow_provider_model_fallback*/ false, Arc::clone(&self.auth_manager), agent_control, @@ -1388,6 +1395,7 @@ impl ThreadManagerState { Box::pin(self.spawn_thread_with_source( config, initial_history, + /*history_mode*/ None, /*allow_provider_model_fallback*/ false, Arc::clone(&self.auth_manager), agent_control, @@ -1429,6 +1437,7 @@ impl ThreadManagerState { Box::pin(self.spawn_thread_with_source( config, initial_history, + /*history_mode*/ None, /*allow_provider_model_fallback*/ false, Arc::clone(&self.auth_manager), agent_control, @@ -1471,6 +1480,7 @@ impl ThreadManagerState { Box::pin(self.spawn_thread_with_source( config, initial_history, + /*history_mode*/ None, /*allow_provider_model_fallback*/ false, auth_manager, agent_control, @@ -1496,6 +1506,7 @@ impl ThreadManagerState { &self, config: Config, initial_history: InitialHistory, + history_mode: Option, allow_provider_model_fallback: bool, auth_manager: Arc, agent_control: AgentControl, @@ -1575,6 +1586,7 @@ impl ThreadManagerState { code_mode_session_provider: Arc::clone(&self.code_mode_session_provider), extensions: Arc::clone(&self.extensions), conversation_history: initial_history, + requested_history_mode: history_mode, session_source, forked_from_thread_id, parent_thread_id, diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index b33d95ac3..4f0da0c9c 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -472,6 +472,7 @@ async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() { config, allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: Some(SessionSource::Internal( InternalSessionSource::MemoryConsolidation, )), @@ -615,6 +616,7 @@ async fn start_thread_seeds_extension_data_for_mcp_and_lifecycle_contributors() config: config.clone(), allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: None, thread_source: None, dynamic_tools: Vec::new(), @@ -631,6 +633,7 @@ async fn start_thread_seeds_extension_data_for_mcp_and_lifecycle_contributors() config: config.clone(), allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: None, thread_source: None, dynamic_tools: Vec::new(), @@ -739,6 +742,7 @@ async fn selected_capability_roots_round_trip_through_fork() { git: None, }, )]), + history_mode: None, session_source: None, thread_source: None, dynamic_tools: Vec::new(), @@ -810,6 +814,7 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() { config: source_config, allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: None, thread_source: None, dynamic_tools: Vec::new(), @@ -1094,6 +1099,7 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() { config: config.clone(), allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: None, thread_source: Some(ThreadSource::User), dynamic_tools: Vec::new(), diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index 6b1126777..e0e82c37f 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -660,6 +660,7 @@ impl TestCodexBuilder { config: config.clone(), allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: None, thread_source: None, dynamic_tools: Vec::new(), diff --git a/codex-rs/core/tests/suite/agents_md.rs b/codex-rs/core/tests/suite/agents_md.rs index 4e0ca794e..7c73b9060 100644 --- a/codex-rs/core/tests/suite/agents_md.rs +++ b/codex-rs/core/tests/suite/agents_md.rs @@ -488,6 +488,7 @@ async fn loads_user_instructions_without_a_primary_environment() -> Result<()> { config: test.config.clone(), allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: None, thread_source: None, dynamic_tools: Vec::new(), @@ -694,6 +695,7 @@ async fn multi_environment_thread_loads_every_project_and_keeps_creation_snapsho config: test.config.clone(), allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: None, thread_source: None, dynamic_tools: Vec::new(), diff --git a/codex-rs/core/tests/suite/personality_migration.rs b/codex-rs/core/tests/suite/personality_migration.rs index 3dd0f3c01..0ca3e1b2a 100644 --- a/codex-rs/core/tests/suite/personality_migration.rs +++ b/codex-rs/core/tests/suite/personality_migration.rs @@ -77,6 +77,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, @@ -130,6 +131,7 @@ async fn write_rollout_with_meta_only(dir: &Path, thread_id: ThreadId) -> io::Re dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 1c2f08262..726d74ebf 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -374,6 +374,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> { dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, diff --git a/codex-rs/core/tests/suite/subagent_notifications.rs b/codex-rs/core/tests/suite/subagent_notifications.rs index 62a367a22..951fb92f1 100644 --- a/codex-rs/core/tests/suite/subagent_notifications.rs +++ b/codex-rs/core/tests/suite/subagent_notifications.rs @@ -763,6 +763,7 @@ async fn subagent_stop_replaces_stop_and_skips_internal_subagents() -> Result<() config: test.config.clone(), allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: Some(SessionSource::SubAgent(SubAgentSource::Review)), thread_source: None, dynamic_tools: Vec::new(), diff --git a/codex-rs/exec/src/lib_tests.rs b/codex-rs/exec/src/lib_tests.rs index 9f345eb33..97ded5052 100644 --- a/codex-rs/exec/src/lib_tests.rs +++ b/codex-rs/exec/src/lib_tests.rs @@ -340,6 +340,7 @@ fn turn_items_for_thread_returns_matching_turn_items() { parent_thread_id: None, preview: String::new(), ephemeral: false, + history_mode: Default::default(), model_provider: "openai".to_string(), created_at: 0, updated_at: 0, @@ -760,6 +761,7 @@ fn sample_thread_start_response() -> ThreadStartResponse { parent_thread_id: None, preview: String::new(), ephemeral: false, + history_mode: Default::default(), model_provider: "openai".to_string(), created_at: 0, updated_at: 0, diff --git a/codex-rs/memories/write/src/runtime.rs b/codex-rs/memories/write/src/runtime.rs index 73744e0a8..6c7ad3978 100644 --- a/codex-rs/memories/write/src/runtime.rs +++ b/codex-rs/memories/write/src/runtime.rs @@ -331,6 +331,7 @@ impl MemoryStartupContext { config, allow_provider_model_fallback: false, initial_history: InitialHistory::New, + history_mode: None, session_source: Some(SessionSource::Internal( InternalSessionSource::MemoryConsolidation, )), diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 491d64aa5..96a793bcd 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -678,6 +678,36 @@ pub enum ThreadMemoryMode { Disabled, } +#[derive(Serialize, Deserialize, Clone, Copy, Debug, Default, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "lowercase")] +#[ts(rename_all = "lowercase")] +pub enum ThreadHistoryMode { + #[default] + Legacy, + Paginated, +} + +impl ThreadHistoryMode { + pub const fn as_str(self) -> &'static str { + match self { + Self::Legacy => "legacy", + Self::Paginated => "paginated", + } + } +} + +impl FromStr for ThreadHistoryMode { + type Err = String; + + fn from_str(value: &str) -> Result { + match value { + "legacy" => Ok(Self::Legacy), + "paginated" => Ok(Self::Paginated), + _ => Err(format!("unknown thread history mode `{value}`")), + } + } +} + impl From> for Op { fn from(value: Vec) -> Self { Op::UserInput { @@ -2607,6 +2637,18 @@ impl InitialHistory { } } + pub fn get_history_mode(&self, default_history_mode: ThreadHistoryMode) -> ThreadHistoryMode { + match self { + InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => { + default_history_mode + } + InitialHistory::Resumed(_) => self + .get_resumed_session_meta() + .map(|meta| meta.history_mode) + .unwrap_or(default_history_mode), + } + } + pub fn get_latest_effective_multi_agent_mode(&self) -> Option { let items = match self { InitialHistory::New | InitialHistory::Cleared => return None, @@ -3033,6 +3075,8 @@ pub struct SessionMeta { pub selected_capability_roots: Vec, #[serde(skip_serializing_if = "Option::is_none")] pub memory_mode: Option, + #[serde(default)] + pub history_mode: ThreadHistoryMode, #[serde(skip_serializing_if = "Option::is_none")] pub multi_agent_version: Option, /// Initial context-window identity for consumers that tail rollout JSONL before compaction. @@ -3062,6 +3106,7 @@ impl Default for SessionMeta { dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: ThreadHistoryMode::default(), multi_agent_version: None, context_window: None, } @@ -5542,6 +5587,70 @@ mod tests { Ok(()) } + #[test] + fn session_meta_defaults_legacy_history_mode() -> Result<()> { + let session_meta: SessionMeta = serde_json::from_value(json!({ + "session_id": "00000000-0000-0000-0000-000000000001", + "id": "00000000-0000-0000-0000-000000000001", + "timestamp": "2026-01-01T00:00:00Z", + "cwd": "/tmp", + "originator": "codex", + "cli_version": "0.0.0", + "model_provider": null, + "base_instructions": null + }))?; + + assert_eq!(session_meta.history_mode, ThreadHistoryMode::Legacy); + let serialized = serde_json::to_value(&session_meta)?; + assert_eq!(serialized["history_mode"], json!("legacy")); + let mut unknown = serialized; + unknown["history_mode"] = json!("future"); + assert!(serde_json::from_value::(unknown).is_err()); + Ok(()) + } + + #[test] + fn resumed_history_uses_persisted_history_mode() -> Result<()> { + let thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000001")?; + let session_meta = RolloutItem::SessionMeta(SessionMetaLine { + meta: SessionMeta { + session_id: thread_id.into(), + id: thread_id, + history_mode: ThreadHistoryMode::Paginated, + ..SessionMeta::default() + }, + git: None, + }); + let history = InitialHistory::Resumed(ResumedHistory { + conversation_id: thread_id, + history: Arc::new(vec![session_meta.clone()]), + rollout_path: None, + }); + + assert_eq!( + history.get_history_mode(ThreadHistoryMode::Legacy), + ThreadHistoryMode::Paginated + ); + assert_eq!( + InitialHistory::Forked(vec![session_meta]).get_history_mode(ThreadHistoryMode::Legacy), + ThreadHistoryMode::Legacy + ); + assert_eq!( + InitialHistory::New.get_history_mode(ThreadHistoryMode::Paginated), + ThreadHistoryMode::Paginated + ); + assert_eq!( + InitialHistory::Resumed(ResumedHistory { + conversation_id: thread_id, + history: Arc::new(Vec::new()), + rollout_path: None, + }) + .get_history_mode(ThreadHistoryMode::Paginated), + ThreadHistoryMode::Paginated + ); + Ok(()) + } + #[test] fn turn_context_item_deserializes_without_network() -> Result<()> { let item: TurnContextItem = serde_json::from_value(json!({ diff --git a/codex-rs/rollout/src/compression_tests.rs b/codex-rs/rollout/src/compression_tests.rs index 733e6cc5f..46c90d012 100644 --- a/codex-rs/rollout/src/compression_tests.rs +++ b/codex-rs/rollout/src/compression_tests.rs @@ -474,6 +474,7 @@ fn write_rollout(path: &std::path::Path, thread_id: ThreadId, message: &str) -> dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index b02b8c3e4..1154d36ee 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -26,7 +26,9 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::USER_MESSAGE_BEGIN; +use serde_json::Value; /// Returned page of thread (thread) summaries. #[derive(Debug, Default, PartialEq)] @@ -62,6 +64,8 @@ pub struct ThreadItem { pub git_origin_url: Option, /// Session source from session metadata. pub source: Option, + /// Persisted thread history contract selected when this thread was created. + pub history_mode: ThreadHistoryMode, /// Immediate control/spawn parent thread id from session metadata. pub parent_thread_id: Option, /// Random unique nickname from session metadata for AgentControl-spawned sub-agents. @@ -99,6 +103,7 @@ struct HeadTailSummary { git_sha: Option, git_origin_url: Option, source: Option, + history_mode: ThreadHistoryMode, parent_thread_id: Option, agent_nickname: Option, agent_role: Option, @@ -806,6 +811,7 @@ async fn build_thread_item( git_sha, git_origin_url, source, + history_mode, parent_thread_id, agent_nickname, agent_role, @@ -828,6 +834,7 @@ async fn build_thread_item( git_sha, git_origin_url, source, + history_mode, parent_thread_id, agent_nickname, agent_role, @@ -1117,12 +1124,26 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result = serde_json::from_str(trimmed); - let Ok(rollout_line) = parsed else { continue }; + let rollout_line = match parsed { + Ok(rollout_line) => rollout_line, + Err(_) => { + if !summary.saw_session_meta + && let Ok(value) = serde_json::from_str::(trimmed) + { + // The first SessionMeta belongs to this rollout. Later SessionMeta lines can + // be copied from fork history, so only an unknown mode before the first parsed + // SessionMeta should make this thread unreadable. + crate::recorder::reject_unknown_thread_history_mode(&value)?; + } + continue; + } + }; match rollout_line.item { RolloutItem::SessionMeta(session_meta_line) => { if !summary.saw_session_meta { summary.source = Some(session_meta_line.meta.source.clone()); + summary.history_mode = session_meta_line.meta.history_mode; summary.parent_thread_id = session_meta_line.meta.parent_thread_id; summary.agent_nickname = session_meta_line.meta.agent_nickname.clone(); summary.agent_role = session_meta_line.meta.agent_role.clone(); diff --git a/codex-rs/rollout/src/metadata.rs b/codex-rs/rollout/src/metadata.rs index d6ed80913..127a8af22 100644 --- a/codex-rs/rollout/src/metadata.rs +++ b/codex-rs/rollout/src/metadata.rs @@ -45,6 +45,7 @@ pub(crate) fn builder_from_session_meta( created_at, session_meta.meta.source.clone(), ); + builder.history_mode = session_meta.meta.history_mode; builder.model_provider = session_meta.meta.model_provider.clone(); builder.agent_nickname = session_meta.meta.agent_nickname.clone(); builder.agent_role = session_meta.meta.agent_role.clone(); diff --git a/codex-rs/rollout/src/metadata_tests.rs b/codex-rs/rollout/src/metadata_tests.rs index c0aea248c..2e1ea98f3 100644 --- a/codex-rs/rollout/src/metadata_tests.rs +++ b/codex-rs/rollout/src/metadata_tests.rs @@ -13,6 +13,7 @@ use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadHistoryMode; use codex_state::BackfillStatus; use codex_state::ThreadMetadataBuilder; use pretty_assertions::assert_eq; @@ -51,6 +52,7 @@ async fn extract_metadata_from_rollout_uses_session_meta() { dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: ThreadHistoryMode::Paginated, multi_agent_version: None, context_window: None, }; @@ -81,6 +83,41 @@ async fn extract_metadata_from_rollout_uses_session_meta() { assert_eq!(outcome.parse_errors, 0); } +#[tokio::test] +async fn extract_metadata_from_rollout_rejects_unknown_history_mode() { + let dir = tempdir().expect("tempdir"); + let uuid = Uuid::new_v4(); + let id = ThreadId::from_string(&uuid.to_string()).expect("thread id"); + let path = dir + .path() + .join(format!("rollout-2026-01-27T12-34-56-{uuid}.jsonl")); + let mut rollout_line = serde_json::to_value(RolloutLine { + timestamp: "2026-01-27T12:34:56Z".to_string(), + item: RolloutItem::SessionMeta(SessionMetaLine { + meta: SessionMeta { + session_id: id.into(), + id, + timestamp: "2026-01-27T12:34:56Z".to_string(), + cwd: dir.path().to_path_buf(), + originator: "cli".to_string(), + cli_version: "0.0.0".to_string(), + ..SessionMeta::default() + }, + git: None, + }), + }) + .expect("serialize rollout line"); + rollout_line["payload"]["history_mode"] = serde_json::json!("future"); + let mut file = File::create(&path).expect("create rollout"); + writeln!(file, "{rollout_line}").expect("write rollout"); + + assert!( + extract_metadata_from_rollout(&path, "openai") + .await + .is_err() + ); +} + #[tokio::test] async fn extract_metadata_from_rollout_returns_latest_memory_mode() { let dir = tempdir().expect("tempdir"); @@ -109,6 +146,7 @@ async fn extract_metadata_from_rollout_returns_latest_memory_mode() { dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }; @@ -379,6 +417,7 @@ fn write_rollout_in_sessions_with_cwd( dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }; diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 58d5c7666..87e97997a 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -61,6 +61,7 @@ use codex_protocol::protocol::SessionContextWindow; use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadSource; use codex_state::StateRuntime; use codex_utils_path as path_utils; @@ -94,6 +95,7 @@ pub enum RolloutRecorderParams { dynamic_tools: Vec, selected_capability_roots: Vec, multi_agent_version: Option, + history_mode: ThreadHistoryMode, initial_window_id: Option, }, Resume { @@ -186,6 +188,7 @@ impl RolloutRecorderParams { dynamic_tools, selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: Default::default(), initial_window_id: None, } } @@ -225,6 +228,16 @@ impl RolloutRecorderParams { self } + pub fn with_history_mode(mut self, history_mode: ThreadHistoryMode) -> Self { + if let Self::Create { + history_mode: mode, .. + } = &mut self + { + *mode = history_mode; + } + self + } + pub fn with_initial_window_id(mut self, initial_window_id: String) -> Self { if let Self::Create { initial_window_id: window_id, @@ -751,6 +764,7 @@ impl RolloutRecorder { dynamic_tools, selected_capability_roots, multi_agent_version, + history_mode, initial_window_id, } => { let log_file_info = precompute_log_file_info(config, conversation_id)?; @@ -789,6 +803,7 @@ impl RolloutRecorder { }, selected_capability_roots, memory_mode: (!config.generate_memories()).then_some("disabled".to_string()), + history_mode, multi_agent_version, context_window: initial_window_id.map(SessionContextWindow::new), }; @@ -956,6 +971,12 @@ impl RolloutRecorder { items.push(item); } Err(e) => { + if thread_id.is_none() { + // The first SessionMeta belongs to this rollout. Later SessionMeta lines + // can be copied from fork history, so only validate unknown history modes + // before we have parsed the rollout's own SessionMeta. + reject_unknown_thread_history_mode(&v)?; + } trace!("failed to parse rollout line: {e}"); parse_errors = parse_errors.saturating_add(1); } @@ -1019,6 +1040,21 @@ impl RolloutRecorder { } } +pub(crate) fn reject_unknown_thread_history_mode(value: &Value) -> std::io::Result<()> { + if value.get("type").and_then(Value::as_str) != Some("session_meta") { + return Ok(()); + } + let Some(history_mode) = value + .get("payload") + .and_then(|payload| payload.get("history_mode")) + else { + return Ok(()); + }; + serde_json::from_value::(history_mode.clone()) + .map(|_| ()) + .map_err(|err| IoError::other(format!("invalid session metadata history_mode: {err}"))) +} + fn strip_legacy_ghost_snapshot_rollout_line(value: &mut Value) -> bool { match value.get("type").and_then(Value::as_str) { Some("response_item") => value @@ -1122,6 +1158,7 @@ fn fill_missing_thread_item_metadata(item: &mut ThreadItem, state_item: ThreadIt git_sha, git_origin_url, source, + history_mode: _, parent_thread_id, agent_nickname, agent_role, @@ -1837,6 +1874,7 @@ fn thread_item_from_state_metadata( .or_else(|_| serde_json::from_value(Value::String(item.source))) .unwrap_or(SessionSource::Unknown), ), + history_mode: item.history_mode, parent_thread_id, agent_nickname: item.agent_nickname, agent_role: item.agent_role, diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 71ec252c2..7fcaff08a 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -15,6 +15,7 @@ use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::TurnContextItem; use codex_protocol::protocol::UserMessageEvent; use pretty_assertions::assert_eq; @@ -103,6 +104,7 @@ async fn state_db_init_backfills_before_returning() -> anyhow::Result<()> { dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, @@ -227,6 +229,43 @@ async fn load_rollout_items_defaults_legacy_session_id() -> std::io::Result<()> Ok(()) } +#[tokio::test] +async fn load_rollout_items_ignores_unknown_fork_source_history_mode() -> std::io::Result<()> { + let home = TempDir::new().expect("temp dir"); + let uuid = Uuid::new_v4(); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("thread id"); + let rollout_path = write_session_file(home.path(), "2025-01-03T12-00-00", uuid)?; + let mut file = fs::OpenOptions::new().append(true).open(&rollout_path)?; + let source_uuid = Uuid::new_v4(); + writeln!( + file, + "{}", + serde_json::json!({ + "timestamp": "2025-01-03T12:00:01Z", + "type": "session_meta", + "payload": { + "session_id": source_uuid, + "id": source_uuid, + "timestamp": "2025-01-03T12:00:01Z", + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version", + "source": "cli", + "model_provider": "test-provider", + "history_mode": "future", + }, + }) + )?; + + let (items, loaded_thread_id, parse_errors) = + RolloutRecorder::load_rollout_items(&rollout_path).await?; + + assert_eq!(loaded_thread_id, Some(thread_id)); + assert_eq!(parse_errors, 1); + assert_eq!(items.len(), 2); + Ok(()) +} + #[tokio::test] async fn load_rollout_items_preserves_legacy_guardian_assessment_lines() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); @@ -391,6 +430,7 @@ async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result< Vec::new(), ) .with_session_id(session_id) + .with_history_mode(ThreadHistoryMode::Paginated) .with_initial_window_id(initial_window_id.clone()), ) .await?; @@ -442,6 +482,7 @@ async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result< panic!("expected session metadata in rollout"); }; assert_eq!(session_meta.meta.session_id, session_id); + assert_eq!(session_meta.meta.history_mode, ThreadHistoryMode::Paginated); assert_eq!( session_meta .meta @@ -1004,6 +1045,7 @@ fn fill_missing_thread_item_metadata_preserves_identity_and_prefers_state_git_fi git_sha: Some("filesystem-sha".to_string()), git_origin_url: Some("https://example.com/filesystem.git".to_string()), source: None, + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -1023,6 +1065,7 @@ fn fill_missing_thread_item_metadata_preserves_identity_and_prefers_state_git_fi git_sha: Some("state-sha".to_string()), git_origin_url: Some("https://example.com/state.git".to_string()), source: Some(SessionSource::Exec), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: Some("state-agent".to_string()), agent_role: Some("state-role".to_string()), diff --git a/codex-rs/rollout/src/session_index_tests.rs b/codex-rs/rollout/src/session_index_tests.rs index be39fb06a..6eb1fa037 100644 --- a/codex-rs/rollout/src/session_index_tests.rs +++ b/codex-rs/rollout/src/session_index_tests.rs @@ -43,6 +43,7 @@ fn write_rollout_with_metadata(path: &Path, thread_id: ThreadId) -> std::io::Res dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, diff --git a/codex-rs/rollout/src/state_db_tests.rs b/codex-rs/rollout/src/state_db_tests.rs index bb341aa85..18566d887 100644 --- a/codex-rs/rollout/src/state_db_tests.rs +++ b/codex-rs/rollout/src/state_db_tests.rs @@ -176,6 +176,7 @@ fn write_rollout_with_user_message( dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, diff --git a/codex-rs/rollout/src/tests.rs b/codex-rs/rollout/src/tests.rs index 901461cf5..23f72cb94 100644 --- a/codex-rs/rollout/src/tests.rs +++ b/codex-rs/rollout/src/tests.rs @@ -129,6 +129,41 @@ async fn find_thread_path_falls_back_when_db_path_is_stale() { assert_state_db_rollout_path(home, thread_id, Some(fs_rollout_path.as_path())).await; } +#[tokio::test] +async fn read_thread_item_from_rollout_rejects_unknown_canonical_history_mode() { + let temp = TempDir::new().unwrap(); + let home = temp.path(); + let uuid = Uuid::from_u128(303); + let ts = "2025-01-03T12-00-00"; + write_session_file( + home, + ts, + uuid, + /*num_records*/ 1, + Some(SessionSource::Cli), + ) + .unwrap(); + let path = home.join(format!("sessions/2025/01/03/rollout-{ts}-{uuid}.jsonl")); + let contents = fs::read_to_string(&path).unwrap(); + let mut lines = contents.lines(); + let mut canonical_meta: serde_json::Value = + serde_json::from_str(lines.next().expect("canonical session meta")).unwrap(); + canonical_meta["payload"]["history_mode"] = serde_json::json!("future"); + let user_event = lines.next().expect("user event"); + let mut source_meta = canonical_meta.clone(); + let source_uuid = Uuid::from_u128(304); + source_meta["payload"]["session_id"] = serde_json::json!(source_uuid); + source_meta["payload"]["id"] = serde_json::json!(source_uuid); + source_meta["payload"]["history_mode"] = serde_json::json!("legacy"); + fs::write( + &path, + format!("{canonical_meta}\n{user_event}\n{source_meta}\n"), + ) + .unwrap(); + + assert_eq!(crate::list::read_thread_item_from_rollout(path).await, None); +} + #[tokio::test] async fn find_thread_path_falls_back_when_db_path_points_to_another_thread() { let temp = TempDir::new().unwrap(); @@ -610,6 +645,7 @@ async fn test_list_conversations_latest_first() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -629,6 +665,7 @@ async fn test_list_conversations_latest_first() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -648,6 +685,7 @@ async fn test_list_conversations_latest_first() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -760,6 +798,7 @@ async fn test_pagination_cursor() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -779,6 +818,7 @@ async fn test_pagination_cursor() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -834,6 +874,7 @@ async fn test_pagination_cursor() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -853,6 +894,7 @@ async fn test_pagination_cursor() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -900,6 +942,7 @@ async fn test_pagination_cursor() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -1072,6 +1115,7 @@ async fn test_get_thread_contents() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -1290,6 +1334,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> { dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, @@ -1432,6 +1477,7 @@ async fn test_timestamp_only_cursor_skips_same_second_filesystem_ties() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, @@ -1451,6 +1497,7 @@ async fn test_timestamp_only_cursor_skips_same_second_filesystem_ties() { git_sha: None, git_origin_url: None, source: Some(SessionSource::VSCode), + history_mode: Default::default(), parent_thread_id: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/state/migrations/0040_threads_history_mode.sql b/codex-rs/state/migrations/0040_threads_history_mode.sql new file mode 100644 index 000000000..53729c2ec --- /dev/null +++ b/codex-rs/state/migrations/0040_threads_history_mode.sql @@ -0,0 +1 @@ +ALTER TABLE threads ADD COLUMN history_mode TEXT NOT NULL DEFAULT 'legacy'; diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index 81de0cc39..f944e3e38 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -56,6 +56,7 @@ fn apply_session_meta_from_item(metadata: &mut ThreadMetadata, meta_line: &Sessi } metadata.id = meta_line.meta.id; metadata.source = enum_to_string(&meta_line.meta.source); + metadata.history_mode = meta_line.meta.history_mode; metadata.thread_source = meta_line.meta.thread_source.clone(); metadata.agent_nickname = meta_line.meta.agent_nickname.clone(); metadata.agent_role = meta_line.meta.agent_role.clone(); @@ -345,6 +346,7 @@ mod tests { dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, @@ -538,6 +540,7 @@ mod tests { dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, @@ -560,6 +563,7 @@ mod tests { updated_at: created_at, recency_at: created_at, source: "cli".to_string(), + history_mode: Default::default(), thread_source: None, agent_path: None, agent_nickname: None, diff --git a/codex-rs/state/src/model/thread_metadata.rs b/codex-rs/state/src/model/thread_metadata.rs index ac8096e45..0024c9866 100644 --- a/codex-rs/state/src/model/thread_metadata.rs +++ b/codex-rs/state/src/model/thread_metadata.rs @@ -6,6 +6,7 @@ use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadSource; use sqlx::Row; use sqlx::sqlite::SqliteRow; @@ -87,6 +88,8 @@ pub struct ThreadMetadata { pub recency_at: DateTime, /// The session source (stringified enum). pub source: String, + /// Persisted thread history contract selected when this thread was created. + pub history_mode: ThreadHistoryMode, /// Optional analytics source classification for this thread. pub thread_source: Option, /// Optional random unique nickname assigned to an AgentControl-spawned sub-agent. @@ -142,6 +145,8 @@ pub struct ThreadMetadataBuilder { pub recency_at: Option>, /// The session source. pub source: SessionSource, + /// Persisted thread history contract selected when this thread was created. + pub history_mode: ThreadHistoryMode, /// Optional analytics source classification for this thread. pub thread_source: Option, /// Optional random unique nickname assigned to the session. @@ -185,6 +190,7 @@ impl ThreadMetadataBuilder { updated_at: None, recency_at: None, source, + history_mode: ThreadHistoryMode::Legacy, thread_source: None, agent_nickname: None, agent_role: None, @@ -222,6 +228,7 @@ impl ThreadMetadataBuilder { updated_at, recency_at, source, + history_mode: self.history_mode, thread_source: self.thread_source.clone(), agent_nickname: self.agent_nickname.clone(), agent_role: self.agent_role.clone(), @@ -368,6 +375,7 @@ pub(crate) struct ThreadRow { updated_at: i64, recency_at: i64, source: String, + history_mode: String, thread_source: Option, agent_nickname: Option, agent_role: Option, @@ -398,6 +406,7 @@ impl ThreadRow { updated_at: row.try_get("updated_at")?, recency_at: row.try_get("recency_at")?, source: row.try_get("source")?, + history_mode: row.try_get("history_mode")?, thread_source: row.try_get("thread_source")?, agent_nickname: row.try_get("agent_nickname")?, agent_role: row.try_get("agent_role")?, @@ -432,6 +441,7 @@ impl TryFrom for ThreadMetadata { updated_at, recency_at, source, + history_mode, thread_source, agent_nickname, agent_role, @@ -456,6 +466,7 @@ impl TryFrom for ThreadMetadata { .map(|thread_source| thread_source.parse()) .transpose() .map_err(anyhow::Error::msg)?; + let history_mode = history_mode.parse().map_err(anyhow::Error::msg)?; Ok(Self { id: ThreadId::try_from(id)?, rollout_path: PathBuf::from(rollout_path), @@ -463,6 +474,7 @@ impl TryFrom for ThreadMetadata { updated_at: epoch_millis_to_datetime(updated_at)?, recency_at: epoch_millis_to_datetime(recency_at)?, source, + history_mode, thread_source, agent_nickname, agent_role, @@ -548,6 +560,7 @@ mod tests { use chrono::Utc; use codex_protocol::ThreadId; use codex_protocol::openai_models::ReasoningEffort; + use codex_protocol::protocol::ThreadHistoryMode; use pretty_assertions::assert_eq; use std::path::PathBuf; @@ -559,6 +572,7 @@ mod tests { updated_at: 1_700_000_100, recency_at: 1_700_000_100, source: "cli".to_string(), + history_mode: "legacy".to_string(), thread_source: None, agent_nickname: None, agent_role: None, @@ -590,6 +604,7 @@ mod tests { updated_at: DateTime::::from_timestamp(1_700_000_100, 0).expect("timestamp"), recency_at: DateTime::::from_timestamp(1_700_000_100, 0).expect("timestamp"), source: "cli".to_string(), + history_mode: ThreadHistoryMode::Legacy, thread_source: None, agent_nickname: None, agent_role: None, @@ -633,4 +648,12 @@ mod tests { expected_thread_metadata(Some(ReasoningEffort::Custom("future".to_string()))) ); } + + #[test] + fn thread_row_rejects_unknown_history_mode() { + let mut row = thread_row(/*reasoning_effort*/ None); + row.history_mode = "future".to_string(); + + assert!(ThreadMetadata::try_from(row).is_err()); + } } diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 83df50e02..ff25fed98 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -136,6 +136,7 @@ WHERE kind = ? AND job_key = ? /// - starts from `threads` filtered to active threads and allowed sources /// (`push_thread_filters`) /// - excludes threads with `memory_mode != 'enabled'` + /// - excludes paginated threads because stage 1 still full-loads rollout JSONL /// - excludes the current thread id /// - keeps only threads whose millisecond `updated_at` is in the age window /// - checks memory staleness against the memories DB @@ -177,6 +178,7 @@ SELECT threads.updated_at_ms AS updated_at, threads.recency_at_ms AS recency_at, threads.source, + threads.history_mode, threads.thread_source, threads.agent_path, threads.agent_nickname, @@ -213,7 +215,7 @@ FROM threads }, /*include_thread_id_tiebreaker*/ false, ); - builder.push(" AND threads.memory_mode = 'enabled'"); + builder.push(" AND threads.memory_mode = 'enabled' AND threads.history_mode = 'legacy'"); builder .push(" AND threads.id != ") .push_bind(current_thread_id.as_str()); @@ -549,6 +551,7 @@ SELECT threads.updated_at_ms AS updated_at, threads.recency_at_ms AS recency_at, threads.source, + threads.history_mode, threads.thread_source, threads.agent_nickname, threads.agent_role, @@ -569,7 +572,7 @@ SELECT threads.git_branch, threads.git_origin_url FROM threads -WHERE threads.id = ? AND threads.memory_mode = 'enabled' +WHERE threads.id = ? AND threads.memory_mode = 'enabled' AND threads.history_mode = 'legacy' "#, ) .bind(thread_id.to_string()) @@ -1673,6 +1676,7 @@ mod tests { use chrono::Duration; use chrono::Utc; use codex_protocol::ThreadId; + use codex_protocol::protocol::ThreadHistoryMode; use pretty_assertions::assert_eq; use sqlx::Row; use std::sync::Arc; @@ -2161,7 +2165,7 @@ mod tests { } #[tokio::test] - async fn claim_stage1_jobs_skips_threads_with_disabled_memory_mode() { + async fn claim_stage1_jobs_skips_threads_without_legacy_enabled_memory() { let codex_home = unique_temp_dir(); let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await @@ -2174,6 +2178,8 @@ mod tests { ThreadId::from_string(&Uuid::new_v4().to_string()).expect("current thread id"); let disabled_thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("disabled thread id"); + let paginated_thread_id = + ThreadId::from_string(&Uuid::new_v4().to_string()).expect("paginated thread id"); let enabled_thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("enabled thread id"); @@ -2200,6 +2206,19 @@ mod tests { .await .expect("disable thread memory mode"); + let mut paginated = test_thread_metadata( + &codex_home, + paginated_thread_id, + codex_home.join("paginated"), + ); + paginated.created_at = eligible_at; + paginated.updated_at = eligible_at; + paginated.history_mode = ThreadHistoryMode::Paginated; + runtime + .upsert_thread(&paginated) + .await + .expect("upsert paginated thread"); + let mut enabled = test_thread_metadata(&codex_home, enabled_thread_id, codex_home.join("enabled")); enabled.created_at = eligible_at; diff --git a/codex-rs/state/src/runtime/test_support.rs b/codex-rs/state/src/runtime/test_support.rs index c029a0ee2..8c6306684 100644 --- a/codex-rs/state/src/runtime/test_support.rs +++ b/codex-rs/state/src/runtime/test_support.rs @@ -11,6 +11,8 @@ use codex_protocol::protocol::AskForApproval; #[cfg(test)] use codex_protocol::protocol::SandboxPolicy; #[cfg(test)] +use codex_protocol::protocol::ThreadHistoryMode; +#[cfg(test)] use std::path::Path; #[cfg(test)] use std::path::PathBuf; @@ -49,6 +51,7 @@ pub(super) fn test_thread_metadata( updated_at: now, recency_at: now, source: "cli".to_string(), + history_mode: ThreadHistoryMode::Legacy, thread_source: None, agent_nickname: None, agent_role: None, diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index e5ab6fb31..c80b2b671 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -15,6 +15,7 @@ SELECT threads.updated_at_ms AS updated_at, threads.recency_at_ms AS recency_at, threads.source, + threads.history_mode, threads.thread_source, threads.agent_nickname, threads.agent_role, @@ -544,6 +545,7 @@ INSERT INTO threads ( updated_at_ms, recency_at_ms, source, + history_mode, thread_source, agent_nickname, agent_role, @@ -565,7 +567,7 @@ INSERT INTO threads ( git_branch, git_origin_url, memory_mode -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO NOTHING "#, ) @@ -578,6 +580,7 @@ ON CONFLICT(id) DO NOTHING .bind(datetime_to_epoch_millis(updated_at)) .bind(datetime_to_epoch_millis(recency_at)) .bind(metadata.source.as_str()) + .bind(metadata.history_mode.as_str()) .bind( metadata .thread_source @@ -796,6 +799,7 @@ INSERT INTO threads ( updated_at_ms, recency_at_ms, source, + history_mode, thread_source, agent_nickname, agent_role, @@ -817,7 +821,7 @@ INSERT INTO threads ( git_branch, git_origin_url, memory_mode -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET rollout_path = excluded.rollout_path, created_at = excluded.created_at, @@ -827,6 +831,7 @@ ON CONFLICT(id) DO UPDATE SET updated_at_ms = excluded.updated_at_ms, recency_at_ms = threads.recency_at_ms, source = excluded.source, + history_mode = excluded.history_mode, thread_source = excluded.thread_source, agent_nickname = excluded.agent_nickname, agent_role = excluded.agent_role, @@ -858,6 +863,7 @@ ON CONFLICT(id) DO UPDATE SET .bind(datetime_to_epoch_millis(updated_at)) .bind(datetime_to_epoch_millis(insert_recency_at)) .bind(metadata.source.as_str()) + .bind(metadata.history_mode.as_str()) .bind( metadata .thread_source @@ -1206,6 +1212,7 @@ SELECT threads.updated_at_ms AS updated_at, threads.recency_at_ms AS recency_at, threads.source, + threads.history_mode, threads.thread_source, threads.agent_nickname, threads.agent_role, @@ -1419,6 +1426,7 @@ mod tests { use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; + use codex_protocol::protocol::ThreadHistoryMode; use pretty_assertions::assert_eq; use serde_json::json; use std::path::PathBuf; @@ -1461,6 +1469,30 @@ mod tests { assert_eq!(memory_mode, "disabled"); } + #[tokio::test] + async fn thread_metadata_round_trips_history_mode() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("state db should initialize"); + let thread_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000124").expect("valid thread id"); + let mut metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone()); + metadata.history_mode = ThreadHistoryMode::Paginated; + + runtime + .upsert_thread(&metadata) + .await + .expect("upsert should succeed"); + + let metadata = runtime + .get_thread(thread_id) + .await + .expect("thread should load") + .expect("thread should exist"); + assert_eq!(metadata.history_mode, ThreadHistoryMode::Paginated); + } + #[tokio::test] async fn delete_thread_cleans_associated_state() -> Result<()> { let codex_home = unique_temp_dir(); @@ -2150,6 +2182,7 @@ mod tests { dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: Some("polluted".to_string()), + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, @@ -2214,6 +2247,7 @@ mod tests { dynamic_tools: None, selected_capability_roots: Vec::new(), memory_mode: None, + history_mode: Default::default(), multi_agent_version: None, context_window: None, }, diff --git a/codex-rs/thread-store/src/error.rs b/codex-rs/thread-store/src/error.rs index 2244c9318..bc5e8ed7c 100644 --- a/codex-rs/thread-store/src/error.rs +++ b/codex-rs/thread-store/src/error.rs @@ -1,8 +1,20 @@ use codex_protocol::ThreadId; +use codex_protocol::protocol::ThreadHistoryMode; /// Result type returned by thread-store operations. pub type ThreadStoreResult = Result; +pub(crate) fn reject_paginated_history_mode( + history_mode: ThreadHistoryMode, +) -> ThreadStoreResult<()> { + if matches!(history_mode, ThreadHistoryMode::Paginated) { + return Err(ThreadStoreError::Unsupported { + operation: "paginated_threads", + }); + } + Ok(()) +} + /// Error type shared by thread-store implementations. #[derive(Debug, thiserror::Error)] pub enum ThreadStoreError { diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index d64510f22..38598830f 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -14,6 +14,7 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionContextWindow; use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadMemoryMode; use codex_rollout::persisted_rollout_items; @@ -36,6 +37,8 @@ use crate::ThreadStoreError; use crate::ThreadStoreFuture; use crate::ThreadStoreResult; use crate::UpdateThreadMetadataParams; +use crate::error::reject_paginated_history_mode; +use crate::types::canonical_history_mode_from_rollout_items; static IN_MEMORY_THREAD_STORES: OnceLock>>> = OnceLock::new(); @@ -128,6 +131,7 @@ mod tests { dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: ThreadHistoryMode::Legacy, initial_window_id: uuid::Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { cwd: None, @@ -193,6 +197,152 @@ mod tests { HashSet::from([child_thread_id, grandchild_thread_id]) ); } + + #[tokio::test] + async fn paginated_threads_allow_metadata_reads_and_reject_legacy_history_paths() { + let store = InMemoryThreadStore::default(); + let thread_id = ThreadId::default(); + let rollout_path = PathBuf::from("/tmp/paginated-thread.jsonl"); + + store + .create_thread(create_thread_params(thread_id, ThreadHistoryMode::Legacy)) + .await + .expect("create legacy thread"); + store + .resume_thread(ResumeThreadParams { + thread_id, + rollout_path: Some(rollout_path.clone()), + history: None, + include_archived: false, + metadata: thread_metadata(), + }) + .await + .expect("register rollout path"); + store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + history_mode: Some(ThreadHistoryMode::Paginated), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("seed paginated metadata"); + + let thread = store + .read_thread(ReadThreadParams { + thread_id, + include_archived: false, + include_history: false, + }) + .await + .expect("metadata read"); + assert_eq!(thread.history_mode, ThreadHistoryMode::Paginated); + assert!(thread.history.is_none()); + + let thread = store + .read_thread_by_rollout_path(ReadThreadByRolloutPathParams { + rollout_path, + include_archived: false, + include_history: false, + }) + .await + .expect("metadata path read"); + assert_eq!(thread.history_mode, ThreadHistoryMode::Paginated); + assert!(thread.history.is_none()); + + assert_paginated_threads_unsupported( + store + .read_thread(ReadThreadParams { + thread_id, + include_archived: false, + include_history: true, + }) + .await + .expect_err("full history read should fail"), + ); + assert_paginated_threads_unsupported( + store + .read_thread_by_rollout_path(ReadThreadByRolloutPathParams { + rollout_path: PathBuf::from("/tmp/paginated-thread.jsonl"), + include_archived: false, + include_history: true, + }) + .await + .expect_err("full history path read should fail"), + ); + assert_paginated_threads_unsupported( + store + .load_history(LoadThreadHistoryParams { + thread_id, + include_archived: false, + }) + .await + .expect_err("history load should fail"), + ); + assert_paginated_threads_unsupported( + store + .resume_thread(ResumeThreadParams { + thread_id, + rollout_path: None, + history: None, + include_archived: false, + metadata: thread_metadata(), + }) + .await + .expect_err("resume should fail"), + ); + assert_paginated_threads_unsupported( + store + .create_thread(create_thread_params( + ThreadId::default(), + ThreadHistoryMode::Paginated, + )) + .await + .expect_err("paginated create should fail"), + ); + } + + fn create_thread_params( + thread_id: ThreadId, + history_mode: ThreadHistoryMode, + ) -> CreateThreadParams { + CreateThreadParams { + session_id: thread_id.into(), + thread_id, + extra_config: None, + forked_from_id: None, + parent_thread_id: None, + source: SessionSource::Exec, + thread_source: None, + originator: "test_originator".to_string(), + base_instructions: BaseInstructions::default(), + dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), + multi_agent_version: None, + history_mode, + initial_window_id: uuid::Uuid::now_v7().to_string(), + metadata: thread_metadata(), + } + } + + fn thread_metadata() -> ThreadPersistenceMetadata { + ThreadPersistenceMetadata { + cwd: None, + model_provider: "test-provider".to_string(), + memory_mode: ThreadMemoryMode::Enabled, + } + } + + fn assert_paginated_threads_unsupported(err: ThreadStoreError) { + assert!(matches!( + err, + ThreadStoreError::Unsupported { + operation: "paginated_threads" + } + )); + } } fn stores_guard() -> MutexGuard<'static, HashMap>> { @@ -265,6 +415,7 @@ impl InMemoryThreadStore { } async fn create_thread(&self, params: CreateThreadParams) -> ThreadStoreResult<()> { + reject_paginated_history_mode(params.history_mode)?; let mut state = self.state.lock().await; state.calls.create_thread += 1; let session_meta = SessionMeta { @@ -285,6 +436,7 @@ impl InMemoryThreadStore { selected_capability_roots: params.selected_capability_roots.clone(), memory_mode: matches!(params.metadata.memory_mode, ThreadMemoryMode::Disabled) .then_some("disabled".to_string()), + history_mode: params.history_mode, multi_agent_version: params.multi_agent_version, context_window: Some(SessionContextWindow::new(params.initial_window_id.clone())), ..SessionMeta::default() @@ -304,6 +456,13 @@ impl InMemoryThreadStore { async fn resume_thread(&self, params: ResumeThreadParams) -> ThreadStoreResult<()> { let mut state = self.state.lock().await; state.calls.resume_thread += 1; + let history_mode = params + .history + .as_deref() + .map(Vec::as_slice) + .map(canonical_history_mode_from_rollout_items) + .unwrap_or_else(|| history_mode_from_state(&state, params.thread_id)); + reject_paginated_history_mode(history_mode)?; if let Some(history) = params.history { state .histories @@ -338,14 +497,18 @@ impl InMemoryThreadStore { ) -> ThreadStoreResult { let mut state = self.state.lock().await; state.calls.load_history += 1; - let items = state.histories.get(¶ms.thread_id).cloned().ok_or( - ThreadStoreError::ThreadNotFound { - thread_id: params.thread_id, - }, - )?; + let items = + state + .histories + .get(¶ms.thread_id) + .ok_or(ThreadStoreError::ThreadNotFound { + thread_id: params.thread_id, + })?; + let history_mode = history_mode_from_state(&state, params.thread_id); + reject_paginated_history_mode(history_mode)?; Ok(StoredThreadHistory { thread_id: params.thread_id, - items, + items: items.clone(), }) } @@ -354,8 +517,10 @@ impl InMemoryThreadStore { state.calls.read_thread += 1; if params.include_history { state.calls.read_thread_with_history += 1; + reject_paginated_history_mode(history_mode_from_state(&state, params.thread_id))?; } - stored_thread_from_state(&state, params.thread_id, params.include_history) + let thread = stored_thread_from_state(&state, params.thread_id, params.include_history)?; + Ok(thread) } async fn read_thread_by_rollout_path( @@ -372,7 +537,11 @@ impl InMemoryThreadStore { ), }); }; - stored_thread_from_state(&state, thread_id, params.include_history) + if params.include_history { + reject_paginated_history_mode(history_mode_from_state(&state, thread_id))?; + } + let thread = stored_thread_from_state(&state, thread_id, params.include_history)?; + Ok(thread) } async fn list_threads(&self) -> ThreadStoreResult { @@ -615,6 +784,9 @@ fn stored_thread_from_state( source: metadata .and_then(|metadata| metadata.source.clone()) .unwrap_or_else(|| created.source.clone()), + history_mode: metadata + .and_then(|metadata| metadata.history_mode) + .unwrap_or(created.history_mode), thread_source: metadata .and_then(|metadata| metadata.thread_source.clone()) .unwrap_or_else(|| created.thread_source.clone()), @@ -634,6 +806,23 @@ fn stored_thread_from_state( }) } +fn history_mode_from_state( + state: &InMemoryThreadStoreState, + thread_id: ThreadId, +) -> ThreadHistoryMode { + state + .metadata_updates + .get(&thread_id) + .and_then(|metadata| metadata.history_mode) + .or_else(|| { + state + .created_threads + .get(&thread_id) + .map(|thread| thread.history_mode) + }) + .unwrap_or_default() +} + fn git_info_from_patch(patch: &ThreadMetadataPatch) -> Option { let git_info = patch.git_info.as_ref()?; let sha = git_info.sha.clone().flatten(); diff --git a/codex-rs/thread-store/src/local/create_thread.rs b/codex-rs/thread-store/src/local/create_thread.rs index 451931b4d..9b370ef09 100644 --- a/codex-rs/thread-store/src/local/create_thread.rs +++ b/codex-rs/thread-store/src/local/create_thread.rs @@ -2,6 +2,7 @@ use super::LocalThreadStore; use crate::CreateThreadParams; use crate::ThreadStoreError; use crate::ThreadStoreResult; +use crate::error::reject_paginated_history_mode; use codex_protocol::protocol::ThreadMemoryMode; use codex_rollout::RolloutConfig; use codex_rollout::RolloutRecorder; @@ -11,6 +12,7 @@ pub(super) async fn create_thread( store: &LocalThreadStore, params: CreateThreadParams, ) -> ThreadStoreResult { + reject_paginated_history_mode(params.history_mode)?; let cwd = params .metadata .cwd @@ -40,6 +42,7 @@ pub(super) async fn create_thread( .with_session_id(params.session_id) .with_selected_capability_roots(params.selected_capability_roots) .with_multi_agent_version(params.multi_agent_version) + .with_history_mode(params.history_mode) .with_initial_window_id(params.initial_window_id), ) .await diff --git a/codex-rs/thread-store/src/local/helpers.rs b/codex-rs/thread-store/src/local/helpers.rs index f48a0136c..d883bd787 100644 --- a/codex-rs/thread-store/src/local/helpers.rs +++ b/codex-rs/thread-store/src/local/helpers.rs @@ -142,6 +142,7 @@ pub(super) fn stored_thread_from_rollout_item( cwd: item.cwd.unwrap_or_default(), cli_version: item.cli_version.unwrap_or_default(), source, + history_mode: item.history_mode, thread_source: None, agent_nickname: item.agent_nickname, agent_role: item.agent_role, diff --git a/codex-rs/thread-store/src/local/list_threads.rs b/codex-rs/thread-store/src/local/list_threads.rs index 2619fe68a..f51423133 100644 --- a/codex-rs/thread-store/src/local/list_threads.rs +++ b/codex-rs/thread-store/src/local/list_threads.rs @@ -219,6 +219,7 @@ mod tests { use chrono::Utc; use codex_protocol::ThreadId; use codex_protocol::protocol::SessionSource; + use codex_protocol::protocol::ThreadHistoryMode; use pretty_assertions::assert_eq; use std::fs; use tempfile::TempDir; @@ -243,6 +244,7 @@ mod tests { Uuid::from_u128(102), "Hello from user", /*model_provider*/ None, + ThreadHistoryMode::Legacy, ) .expect("session file"); diff --git a/codex-rs/thread-store/src/local/live_writer.rs b/codex-rs/thread-store/src/local/live_writer.rs index 3e7541b36..85ef80d88 100644 --- a/codex-rs/thread-store/src/local/live_writer.rs +++ b/codex-rs/thread-store/src/local/live_writer.rs @@ -16,6 +16,8 @@ use crate::ReadThreadParams; use crate::ResumeThreadParams; use crate::ThreadStoreError; use crate::ThreadStoreResult; +use crate::error::reject_paginated_history_mode; +use crate::types::canonical_history_mode_from_rollout_items; const ROLLOUT_SIZE_BYTES_METRIC: &str = "codex.rollout.size_bytes"; @@ -34,6 +36,30 @@ pub(super) async fn resume_thread( params: ResumeThreadParams, ) -> ThreadStoreResult<()> { store.ensure_live_recorder_absent(params.thread_id).await?; + let history_mode = if let Some(history) = params.history.as_deref() { + canonical_history_mode_from_rollout_items(history) + } else if let Some(rollout_path) = params.rollout_path.as_ref() { + super::read_thread::read_thread_by_rollout_path( + store, + rollout_path.clone(), + params.include_archived, + /*include_history*/ false, + ) + .await? + .history_mode + } else { + super::read_thread::read_thread( + store, + ReadThreadParams { + thread_id: params.thread_id, + include_archived: params.include_archived, + include_history: false, + }, + ) + .await? + .history_mode + }; + reject_paginated_history_mode(history_mode)?; let rollout_path = match (params.rollout_path, params.history) { (Some(rollout_path), _history) => rollout_path, (None, history) => { @@ -46,7 +72,6 @@ pub(super) async fn resume_thread( }, ) .await?; - thread .rollout_path .ok_or_else(|| ThreadStoreError::Internal { diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index 717ae09b6..bc34c52f3 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -321,6 +321,7 @@ mod tests { use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; + use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; @@ -333,6 +334,7 @@ mod tests { use crate::local::test_support::test_config; use crate::local::test_support::write_archived_session_file; use crate::local::test_support::write_session_file; + use crate::local::test_support::write_session_file_with_history_mode; #[tokio::test] async fn live_writer_lifecycle_writes_and_closes() { @@ -1109,18 +1111,108 @@ mod tests { .expect("read thread by rollout path"); assert_eq!(thread.thread_id, thread_id); + assert_eq!(thread.history_mode, ThreadHistoryMode::Legacy); assert_eq!( thread .history + .as_ref() .expect("history") .items - .into_iter() + .iter() .filter(|item| matches!(item, RolloutItem::EventMsg(EventMsg::UserMessage(_)))) .count(), 1 ); } + #[tokio::test] + async fn paginated_threads_allow_metadata_reads_and_reject_legacy_history_paths() { + let home = TempDir::new().expect("temp dir"); + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); + let uuid = uuid::Uuid::from_u128(408); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + let rollout_path = write_session_file_with_history_mode( + home.path(), + "2025-01-04T12-00-00", + uuid, + ThreadHistoryMode::Paginated, + ) + .expect("session file"); + + let thread = store + .read_thread(ReadThreadParams { + thread_id, + include_archived: false, + include_history: false, + }) + .await + .expect("metadata read"); + assert_eq!(thread.history_mode, ThreadHistoryMode::Paginated); + assert!(thread.history.is_none()); + + let thread = store + .read_thread_by_rollout_path( + rollout_path.clone(), + /*include_archived*/ true, + /*include_history*/ false, + ) + .await + .expect("metadata path read"); + assert_eq!(thread.history_mode, ThreadHistoryMode::Paginated); + assert!(thread.history.is_none()); + + assert_paginated_threads_unsupported( + store + .read_thread(ReadThreadParams { + thread_id, + include_archived: false, + include_history: true, + }) + .await + .expect_err("full history read should fail"), + ); + assert_paginated_threads_unsupported( + store + .read_thread_by_rollout_path( + rollout_path.clone(), + /*include_archived*/ true, + /*include_history*/ true, + ) + .await + .expect_err("full history path read should fail"), + ); + assert_paginated_threads_unsupported( + store + .load_history(LoadThreadHistoryParams { + thread_id, + include_archived: false, + }) + .await + .expect_err("history load should fail"), + ); + assert_paginated_threads_unsupported( + store + .resume_thread(ResumeThreadParams { + thread_id, + rollout_path: Some(rollout_path), + history: None, + include_archived: false, + metadata: thread_metadata(), + }) + .await + .expect_err("resume should fail"), + ); + + let mut create_params = create_thread_params(ThreadId::default()); + create_params.history_mode = ThreadHistoryMode::Paginated; + assert_paginated_threads_unsupported( + store + .create_thread(create_params) + .await + .expect_err("paginated create should fail"), + ); + } + fn create_thread_params(thread_id: ThreadId) -> CreateThreadParams { CreateThreadParams { session_id: thread_id.into(), @@ -1135,11 +1227,21 @@ mod tests { dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: ThreadHistoryMode::Legacy, initial_window_id: uuid::Uuid::now_v7().to_string(), metadata: thread_metadata(), } } + fn assert_paginated_threads_unsupported(err: ThreadStoreError) { + assert!(matches!( + err, + ThreadStoreError::Unsupported { + operation: "paginated_threads" + } + )); + } + fn thread_metadata() -> ThreadPersistenceMetadata { ThreadPersistenceMetadata { cwd: Some(std::env::current_dir().expect("cwd")), diff --git a/codex-rs/thread-store/src/local/read_thread.rs b/codex-rs/thread-store/src/local/read_thread.rs index 1271e4fcf..27ce3076e 100644 --- a/codex-rs/thread-store/src/local/read_thread.rs +++ b/codex-rs/thread-store/src/local/read_thread.rs @@ -25,6 +25,7 @@ use crate::StoredThread; use crate::StoredThreadHistory; use crate::ThreadStoreError; use crate::ThreadStoreResult; +use crate::error::reject_paginated_history_mode; pub(super) async fn read_thread( store: &LocalThreadStore, @@ -47,7 +48,7 @@ pub(super) async fn read_thread( .await) { let metadata_sandbox_policy = metadata.sandbox_policy.clone(); - let mut thread = stored_thread_from_sqlite_metadata(store, metadata).await; + let mut thread = stored_thread_from_sqlite_metadata(store, metadata).await?; if !params.include_history && let Some(rollout_path) = thread.rollout_path.clone() && let Ok(mut rollout_thread) = read_thread_from_rollout_path(store, rollout_path).await @@ -66,6 +67,7 @@ pub(super) async fn read_thread( ); thread = rollout_thread; } + reject_paginated_history(&thread, params.include_history)?; attach_history_if_requested(&mut thread, params.include_history).await?; return Ok(thread); } @@ -82,6 +84,7 @@ pub(super) async fn read_thread( message: format!("thread {} is archived", thread.thread_id), }); } + reject_paginated_history(&thread, params.include_history)?; attach_history_if_requested(&mut thread, params.include_history).await?; Ok(thread) } @@ -132,10 +135,18 @@ pub(super) async fn read_thread_by_rollout_path( metadata.git_origin_url.or(fallback_origin_url), ); } + reject_paginated_history(&thread, include_history)?; attach_history_if_requested(&mut thread, include_history).await?; Ok(thread) } +fn reject_paginated_history(thread: &StoredThread, include_history: bool) -> ThreadStoreResult<()> { + if include_history { + reject_paginated_history_mode(thread.history_mode)?; + } + Ok(()) +} + async fn resolve_requested_rollout_path( store: &LocalThreadStore, rollout_path: std::path::PathBuf, @@ -261,16 +272,16 @@ async fn read_thread_from_rollout_path( message: format!("failed to read thread id from {}", path.display()), })?; thread.rollout_path = Some(codex_rollout::plain_rollout_path(path.as_path())); - if let Ok(meta_line) = read_session_meta_line(path.as_path()).await { - thread.forked_from_id = meta_line.meta.forked_from_id; - thread.parent_thread_id = meta_line.meta.parent_thread_id; - if let Some(model_provider) = meta_line - .meta - .model_provider - .filter(|provider| !provider.is_empty()) - { - thread.model_provider = model_provider; - } + let meta_line = read_required_session_meta_line(path.as_path()).await?; + thread.forked_from_id = meta_line.meta.forked_from_id; + thread.parent_thread_id = meta_line.meta.parent_thread_id; + thread.history_mode = meta_line.meta.history_mode; + if let Some(model_provider) = meta_line + .meta + .model_provider + .filter(|provider| !provider.is_empty()) + { + thread.model_provider = model_provider; } if let Ok(Some(title)) = find_thread_name_by_id(store.config.codex_home.as_path(), &thread.thread_id).await @@ -302,7 +313,7 @@ async fn read_sqlite_metadata( async fn stored_thread_from_sqlite_metadata( store: &LocalThreadStore, metadata: ThreadMetadata, -) -> StoredThread { +) -> ThreadStoreResult { let name = match distinct_thread_metadata_title(&metadata) { Some(title) => Some(title), None => find_thread_name_by_id(store.config.codex_home.as_path(), &metadata.id) @@ -311,13 +322,32 @@ async fn stored_thread_from_sqlite_metadata( .flatten() .filter(|title| !title.trim().is_empty()), }; - let session_meta = read_session_meta_line(metadata.rollout_path.as_path()) - .await - .ok() - .map(|meta_line| meta_line.meta); + let session_meta = match read_required_session_meta_line(metadata.rollout_path.as_path()).await + { + Ok(meta_line) => Some(meta_line.meta), + Err(_) + if codex_rollout::existing_rollout_path(metadata.rollout_path.as_path()) + .await + .is_none() => + { + None + } + Err(err) => { + return Err(ThreadStoreError::Internal { + message: format!( + "failed to read session metadata {}: {err}", + metadata.rollout_path.display() + ), + }); + } + }; let rollout_path = codex_rollout::plain_rollout_path(metadata.rollout_path.as_path()); let forked_from_id = session_meta.as_ref().and_then(|meta| meta.forked_from_id); let parent_thread_id = session_meta.as_ref().and_then(|meta| meta.parent_thread_id); + let history_mode = session_meta + .as_ref() + .map(|meta| meta.history_mode) + .unwrap_or(metadata.history_mode); let preview = metadata .preview .clone() @@ -325,7 +355,7 @@ async fn stored_thread_from_sqlite_metadata( .unwrap_or_default(); let permission_profile = permission_profile_from_metadata_value(&metadata.sandbox_policy, metadata.cwd.as_path()); - StoredThread { + Ok(StoredThread { thread_id: metadata.id, extra_config: None, rollout_path: Some(rollout_path), @@ -347,6 +377,7 @@ async fn stored_thread_from_sqlite_metadata( cwd: metadata.cwd, cli_version: metadata.cli_version, source: parse_session_source(&metadata.source), + history_mode, thread_source: metadata.thread_source, agent_nickname: metadata.agent_nickname, agent_role: metadata.agent_role, @@ -361,24 +392,30 @@ async fn stored_thread_from_sqlite_metadata( token_usage: None, first_user_message: metadata.first_user_message, history: None, - } + }) } async fn stored_thread_from_session_meta( store: &LocalThreadStore, path: std::path::PathBuf, ) -> ThreadStoreResult { - let meta_line = read_session_meta_line(path.as_path()) - .await - .map_err(|err| ThreadStoreError::Internal { - message: format!("failed to read thread {}: {err}", path.display()), - })?; + let meta_line = read_required_session_meta_line(path.as_path()).await?; let archived = rollout_path_is_archived(store.config.codex_home.as_path(), path.as_path()); Ok(stored_thread_from_meta_line( store, meta_line, path, archived, )) } +async fn read_required_session_meta_line( + path: &std::path::Path, +) -> ThreadStoreResult { + read_session_meta_line(path) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!("failed to read session metadata {}: {err}", path.display()), + }) +} + fn stored_thread_from_meta_line( store: &LocalThreadStore, meta_line: SessionMetaLine, @@ -414,6 +451,7 @@ fn stored_thread_from_meta_line( cwd: meta_line.meta.cwd, cli_version: meta_line.meta.cli_version, source: meta_line.meta.source, + history_mode: meta_line.meta.history_mode, thread_source: meta_line.meta.thread_source, agent_nickname: meta_line.meta.agent_nickname, agent_role: meta_line.meta.agent_role, @@ -457,6 +495,7 @@ mod tests { use codex_protocol::ThreadId; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; + use codex_protocol::protocol::ThreadHistoryMode; use codex_state::ThreadMetadataBuilder; use pretty_assertions::assert_eq; use tempfile::TempDir; @@ -665,6 +704,7 @@ mod tests { "Forked user message", Some("test-provider"), Some(parent_uuid), + ThreadHistoryMode::Legacy, ) .expect("forked session file"); diff --git a/codex-rs/thread-store/src/local/test_support.rs b/codex-rs/thread-store/src/local/test_support.rs index 10a55c4fd..a930067fc 100644 --- a/codex-rs/thread-store/src/local/test_support.rs +++ b/codex-rs/thread-store/src/local/test_support.rs @@ -3,6 +3,7 @@ use std::io::Write; use std::path::Path; use std::path::PathBuf; +use codex_protocol::protocol::ThreadHistoryMode; use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; use uuid::Uuid; @@ -17,6 +18,15 @@ pub(super) fn test_config(codex_home: &Path) -> LocalThreadStoreConfig { } pub(super) fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result { + write_session_file_with_history_mode(root, ts, uuid, ThreadHistoryMode::Legacy) +} + +pub(super) fn write_session_file_with_history_mode( + root: &Path, + ts: &str, + uuid: Uuid, + history_mode: ThreadHistoryMode, +) -> std::io::Result { write_session_file_with( root, root.join("sessions/2025/01/03"), @@ -24,6 +34,7 @@ pub(super) fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io:: uuid, "Hello from user", Some("test-provider"), + history_mode, ) } @@ -39,6 +50,7 @@ pub(super) fn write_archived_session_file( uuid, "Archived user message", Some("test-provider"), + ThreadHistoryMode::Legacy, ) } @@ -49,6 +61,7 @@ pub(super) fn write_session_file_with( uuid: Uuid, first_user_message: &str, model_provider: Option<&str>, + history_mode: ThreadHistoryMode, ) -> std::io::Result { write_session_file_with_fork( root, @@ -58,9 +71,11 @@ pub(super) fn write_session_file_with( first_user_message, model_provider, /*forked_from_id*/ None, + history_mode, ) } +#[allow(clippy::too_many_arguments)] pub(super) fn write_session_file_with_fork( root: &Path, day_dir: PathBuf, @@ -69,6 +84,7 @@ pub(super) fn write_session_file_with_fork( first_user_message: &str, model_provider: Option<&str>, forked_from_id: Option, + history_mode: ThreadHistoryMode, ) -> std::io::Result { fs::create_dir_all(&day_dir)?; let path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl")); @@ -86,6 +102,7 @@ pub(super) fn write_session_file_with_fork( "cli_version": "test_version", "source": "cli", "model_provider": model_provider, + "history_mode": history_mode, "git": { "commit_hash": "abcdef", "branch": "main", diff --git a/codex-rs/thread-store/src/local/update_thread_metadata.rs b/codex-rs/thread-store/src/local/update_thread_metadata.rs index 5f78b2e1f..38e919b87 100644 --- a/codex-rs/thread-store/src/local/update_thread_metadata.rs +++ b/codex-rs/thread-store/src/local/update_thread_metadata.rs @@ -27,6 +27,7 @@ use crate::ThreadMetadataPatch; use crate::ThreadStoreError; use crate::ThreadStoreResult; use crate::UpdateThreadMetadataParams; +use crate::error::reject_paginated_history_mode; use crate::local::read_thread; struct ResolvedRolloutPath { @@ -53,6 +54,20 @@ pub(super) async fn update_thread_metadata( } let needs_rollout_compat = needs_rollout_compatibility_update(&patch); + if needs_rollout_compat { + // These explicit patches still write legacy rollout/name-index state after the + // SQLite update. Paginated threads must fail before either side is mutated. + let thread = read_thread::read_thread( + store, + ReadThreadParams { + thread_id, + include_archived: params.include_archived, + include_history: false, + }, + ) + .await?; + reject_paginated_history_mode(thread.history_mode)?; + } let require_sqlite_write = sqlite_write_failure_should_block(&patch); let updated = apply_metadata_update( store, @@ -222,6 +237,7 @@ async fn apply_metadata_update( patch.source.clone().unwrap_or(SessionSource::Unknown), ); builder.model_provider = patch.model_provider.clone(); + builder.history_mode = patch.history_mode.unwrap_or_default(); builder.thread_source = patch.thread_source.clone().flatten(); builder.agent_nickname = patch.agent_nickname.clone().flatten(); builder.agent_role = patch.agent_role.clone().flatten(); @@ -269,6 +285,9 @@ async fn apply_metadata_update( if let Some(source) = patch.source { metadata.source = enum_to_string(&source); } + if let Some(history_mode) = patch.history_mode { + metadata.history_mode = history_mode; + } if let Some(thread_source) = patch.thread_source { metadata.thread_source = thread_source; } @@ -412,6 +431,7 @@ fn has_observed_metadata_facts(patch: &ThreadMetadataPatch) -> bool { || patch.permission_profile.is_some() || patch.token_usage.is_some() || patch.first_user_message.is_some() + || patch.history_mode.is_some() } fn enum_to_string(value: &T) -> String { @@ -632,6 +652,7 @@ fn rollout_path_is_archived(store: &LocalThreadStore, path: &Path) -> bool { #[cfg(test)] mod tests { use codex_protocol::models::PermissionProfile; + use codex_protocol::protocol::ThreadHistoryMode; use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; @@ -651,6 +672,7 @@ mod tests { use crate::local::test_support::test_config; use crate::local::test_support::write_archived_session_file; use crate::local::test_support::write_session_file; + use crate::local::test_support::write_session_file_with_history_mode; #[tokio::test] async fn update_thread_metadata_sets_name_on_active_rollout_and_indexes_name() { @@ -719,6 +741,55 @@ mod tests { assert_eq!(memory_mode.as_deref(), Some("disabled")); } + #[tokio::test] + async fn update_thread_metadata_rejects_paginated_rollout_compatibility_writes() { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + let uuid = Uuid::from_u128(303); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + let path = write_session_file_with_history_mode( + home.path(), + "2025-01-03T14-35-00", + uuid, + ThreadHistoryMode::Paginated, + ) + .expect("session file"); + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config, Some(runtime.clone())); + + assert!(matches!( + store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + memory_mode: Some(ThreadMemoryMode::Disabled), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect_err("paginated rollout compatibility write should fail"), + ThreadStoreError::Unsupported { + operation: "paginated_threads" + } + )); + + assert_eq!(last_rollout_item(path.as_path())["type"], "event_msg"); + assert_eq!( + runtime + .get_thread_memory_mode(thread_id) + .await + .expect("thread memory mode should be readable") + .as_deref(), + Some("enabled") + ); + } + #[tokio::test] async fn update_thread_metadata_preserves_memory_mode_when_updating_git_info() { let home = TempDir::new().expect("temp dir"); diff --git a/codex-rs/thread-store/src/store.rs b/codex-rs/thread-store/src/store.rs index 65ad1ce8a..67ee603f4 100644 --- a/codex-rs/thread-store/src/store.rs +++ b/codex-rs/thread-store/src/store.rs @@ -1,4 +1,5 @@ use codex_protocol::ThreadId; +use codex_protocol::protocol::ThreadHistoryMode; use std::any::Any; use std::future::Future; use std::pin::Pin; @@ -33,6 +34,14 @@ pub trait ThreadStore: Any + Send + Sync { /// Return this store as [`Any`] for implementation-owned escape hatches. fn as_any(&self) -> &dyn Any; + /// Returns the history mode to use when history does not carry a persisted mode. + /// + /// The default is legacy so existing stores stay compatible. Stores whose durable contract is + /// already paginated should override this instead of relying on core to infer storage behavior. + fn default_history_mode(&self) -> ThreadHistoryMode { + ThreadHistoryMode::Legacy + } + /// Creates a new live thread. fn create_thread(&self, params: CreateThreadParams) -> ThreadStoreFuture<'_, ()>; diff --git a/codex-rs/thread-store/src/thread_metadata_sync.rs b/codex-rs/thread-store/src/thread_metadata_sync.rs index 1057715a4..a7b52ee0c 100644 --- a/codex-rs/thread-store/src/thread_metadata_sync.rs +++ b/codex-rs/thread-store/src/thread_metadata_sync.rs @@ -76,6 +76,7 @@ impl ThreadMetadataSync { cli_version: Some(env!("CARGO_PKG_VERSION").to_string()), git_info: git_info.map(git_info_patch_from_observation), memory_mode: Some(params.metadata.memory_mode), + history_mode: Some(params.history_mode), ..Default::default() }; Self { @@ -237,6 +238,7 @@ impl ThreadMetadataSync { { update.memory_mode = Some(memory_mode); } + update.history_mode = Some(meta_line.meta.history_mode); } RolloutItem::TurnContext(turn_ctx) => { if !self.cwd_seen { @@ -375,6 +377,7 @@ fn update_has_metadata_facts(update: &ThreadMetadataPatch) -> bool { || update.first_user_message.is_some() || update.git_info.is_some() || update.memory_mode.is_some() + || update.history_mode.is_some() } fn git_info_patch_from_observation(git_info: GitInfo) -> GitInfoPatch { diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index f96a4df31..3b86840ca 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -15,6 +15,7 @@ use codex_protocol::protocol::GitInfo; use codex_protocol::protocol::MultiAgentVersion; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadMemoryMode as MemoryMode; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TokenUsage; @@ -91,6 +92,8 @@ pub struct CreateThreadParams { pub selected_capability_roots: Vec, /// Multi-agent runtime selected when the thread was created. pub multi_agent_version: Option, + /// Persisted thread history contract selected when the thread was created. + pub history_mode: ThreadHistoryMode, /// Initial context-window identity captured when the thread was created. pub initial_window_id: String, /// Metadata captured for the newly created thread. @@ -112,6 +115,20 @@ pub struct ResumeThreadParams { pub metadata: ThreadPersistenceMetadata, } +pub(crate) fn canonical_history_mode_from_rollout_items( + items: &[RolloutItem], +) -> ThreadHistoryMode { + // Forked rollouts keep copied source SessionMeta items after the new thread's + // canonical SessionMeta, so the thread contract comes from the first one. + items + .iter() + .find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.history_mode), + _ => None, + }) + .unwrap_or_default() +} + /// Parameters for appending rollout items to a live thread. #[derive(Clone, Debug)] pub struct AppendThreadItemsParams { @@ -430,6 +447,8 @@ pub struct StoredThread { pub cli_version: String, /// Runtime source for the thread. pub source: SessionSource, + /// Persisted thread history contract selected when this thread was created. + pub history_mode: ThreadHistoryMode, /// Optional analytics source classification for this thread. pub thread_source: Option, /// Optional random nickname for thread-spawn sub-agents. @@ -577,6 +596,8 @@ pub struct ThreadMetadataPatch { pub git_info: Option, /// Thread memory behavior. pub memory_mode: Option, + /// Persisted thread history contract. + pub history_mode: Option, } impl ThreadMetadataPatch { @@ -657,6 +678,9 @@ impl ThreadMetadataPatch { if next.memory_mode.is_some() { self.memory_mode = next.memory_mode; } + if next.history_mode.is_some() { + self.history_mode = next.history_mode; + } } pub fn is_empty(&self) -> bool { @@ -683,6 +707,7 @@ impl ThreadMetadataPatch { && self.first_user_message.is_none() && self.git_info.is_none() && self.memory_mode.is_none() + && self.history_mode.is_none() } } @@ -785,6 +810,17 @@ mod tests { assert!(decoded.is_empty()); } + #[test] + fn canonical_history_mode_uses_first_session_meta() { + assert_eq!( + canonical_history_mode_from_rollout_items(&[ + session_meta(ThreadHistoryMode::Legacy), + session_meta(ThreadHistoryMode::Paginated), + ]), + ThreadHistoryMode::Legacy + ); + } + #[test] fn thread_metadata_patch_merge_uses_presence_semantics() { let mut current = ThreadMetadataPatch { @@ -822,4 +858,14 @@ mod tests { }) ); } + + fn session_meta(history_mode: ThreadHistoryMode) -> RolloutItem { + RolloutItem::SessionMeta(codex_protocol::protocol::SessionMetaLine { + meta: codex_protocol::protocol::SessionMeta { + history_mode, + ..Default::default() + }, + git: None, + }) + } } diff --git a/codex-rs/tui/src/app/loaded_threads.rs b/codex-rs/tui/src/app/loaded_threads.rs index 8e790170e..9482453e9 100644 --- a/codex-rs/tui/src/app/loaded_threads.rs +++ b/codex-rs/tui/src/app/loaded_threads.rs @@ -134,6 +134,7 @@ mod tests { parent_thread_id: None, preview: String::new(), ephemeral: false, + history_mode: Default::default(), model_provider: "openai".to_string(), created_at: 0, updated_at: 0, diff --git a/codex-rs/tui/src/app/tests.rs b/codex-rs/tui/src/app/tests.rs index 29e29541a..d8d63575b 100644 --- a/codex-rs/tui/src/app/tests.rs +++ b/codex-rs/tui/src/app/tests.rs @@ -2880,6 +2880,7 @@ async fn inactive_thread_started_notification_initializes_replay_session() -> Re parent_thread_id: None, preview: "agent thread".to_string(), ephemeral: false, + history_mode: Default::default(), model_provider: "agent-provider".to_string(), created_at: 1, updated_at: 2, @@ -2974,6 +2975,7 @@ async fn inactive_thread_started_notification_preserves_primary_model_when_path_ parent_thread_id: None, preview: "agent thread".to_string(), ephemeral: false, + history_mode: Default::default(), model_provider: "agent-provider".to_string(), created_at: 1, updated_at: 2, @@ -3035,6 +3037,7 @@ async fn thread_read_session_state_does_not_reuse_primary_permission_profile() { parent_thread_id: None, preview: "read thread".to_string(), ephemeral: false, + history_mode: Default::default(), model_provider: "read-provider".to_string(), created_at: 1, updated_at: 2, @@ -5652,6 +5655,7 @@ async fn thread_rollback_response_discards_queued_active_thread_events() { parent_thread_id: None, preview: String::new(), ephemeral: false, + history_mode: Default::default(), model_provider: "openai".to_string(), created_at: 0, updated_at: 0, diff --git a/codex-rs/tui/src/app/thread_session_state.rs b/codex-rs/tui/src/app/thread_session_state.rs index 58412bff0..602c5b210 100644 --- a/codex-rs/tui/src/app/thread_session_state.rs +++ b/codex-rs/tui/src/app/thread_session_state.rs @@ -415,6 +415,7 @@ mod tests { parent_thread_id: None, preview: "read thread".to_string(), ephemeral: false, + history_mode: Default::default(), model_provider: "read-provider".to_string(), created_at: 1, updated_at: 2, diff --git a/codex-rs/tui/src/app_server_session.rs b/codex-rs/tui/src/app_server_session.rs index 7181d1a56..0405bb75a 100644 --- a/codex-rs/tui/src/app_server_session.rs +++ b/codex-rs/tui/src/app_server_session.rs @@ -2326,6 +2326,7 @@ mod tests { parent_thread_id: None, preview: "hello".to_string(), ephemeral: false, + history_mode: Default::default(), model_provider: "openai".to_string(), created_at: 1, updated_at: 2, diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index d74017d8d..e0e390724 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -5727,6 +5727,7 @@ session_picker_view = "dense" parent_thread_id: None, preview: String::from("remote thread"), ephemeral: false, + history_mode: Default::default(), model_provider: String::from("openai"), created_at: 1, updated_at: 2, @@ -5764,6 +5765,7 @@ session_picker_view = "dense" parent_thread_id: None, preview: String::from("preview"), ephemeral: false, + history_mode: Default::default(), model_provider: String::from("openai"), created_at: 1, updated_at: 2, @@ -5835,6 +5837,7 @@ session_picker_view = "dense" parent_thread_id: None, preview: String::from("preview"), ephemeral: false, + history_mode: Default::default(), model_provider: String::from("openai"), created_at: 1, updated_at: 2, @@ -5895,6 +5898,7 @@ session_picker_view = "dense" parent_thread_id: None, preview: String::from("preview"), ephemeral: false, + history_mode: Default::default(), model_provider: String::from("openai"), created_at: 1, updated_at: 2,