app-server: remove experimental persist_extended_history bool flag (#25712)

## Summary

Remove the dead experimental `persistExtendedHistory` app-server flag
and collapse rollout persistence to the single policy app-server already
used.

## What Changed

- Removed `persistExtendedHistory` from v2 thread start/resume/fork
params and deleted its deprecation notice path.
- Removed the persistence-mode enums and plumbing through core, rollout,
and thread-store.
- Made rollout filtering mode-free, keeping the existing limited
persisted-history behavior.

## Test Plan

- `just write-app-server-schema`
- `cargo nextest run --no-fail-fast -p codex-app-server-protocol
schema_fixtures`
- `cargo nextest run --no-fail-fast -p codex-app-server
thread_shell_command_history_responses_exclude_persisted_command_executions`
- `cargo nextest run --no-fail-fast -p codex-rollout -p
codex-thread-store`
- final `rg` for removed flag/type names
This commit is contained in:
Owen Lin
2026-06-01 16:33:42 -07:00
committed by GitHub
Unverified
parent bca18cba40
commit 11e0f3d3ae
47 changed files with 24 additions and 363 deletions
-1
View File
@@ -3598,7 +3598,6 @@ dependencies = [
"codex-protocol",
"codex-state",
"codex-utils-path",
"codex-utils-string",
"pretty_assertions",
"regex",
"serde",
@@ -164,12 +164,6 @@ pub struct ThreadStartParams {
#[experimental("thread/start.experimentalRawEvents")]
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub experimental_raw_events: bool,
/// Deprecated and ignored by app-server. Kept only so older clients can
/// continue sending the field while rollout persistence always uses the
/// limited history policy.
#[experimental("thread/start.persistFullHistory")]
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub persist_extended_history: bool,
}
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
@@ -402,12 +396,6 @@ pub struct ThreadResumeParams {
#[experimental("thread/resume.initialTurnsPage")]
#[ts(optional = nullable)]
pub initial_turns_page: Option<ThreadResumeInitialTurnsPageParams>,
/// Deprecated and ignored by app-server. Kept only so older clients can
/// continue sending the field while rollout persistence always uses the
/// limited history policy.
#[experimental("thread/resume.persistFullHistory")]
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub persist_extended_history: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, ExperimentalApi)]
@@ -557,12 +545,6 @@ pub struct ThreadForkParams {
#[experimental("thread/fork.excludeTurns")]
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub exclude_turns: bool,
/// Deprecated and ignored by app-server. Kept only so older clients can
/// continue sending the field while rollout persistence always uses the
/// limited history policy.
#[experimental("thread/fork.persistFullHistory")]
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub persist_extended_history: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, ExperimentalApi)]
@@ -50,7 +50,6 @@ use codex_app_server_protocol::CommandExecWriteParams;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::ConversationGitInfo;
use codex_app_server_protocol::ConversationSummary;
use codex_app_server_protocol::DeprecationNoticeNotification;
use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec;
use codex_app_server_protocol::EnvironmentAddParams;
use codex_app_server_protocol::EnvironmentAddResponse;
@@ -404,7 +403,6 @@ use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
use codex_protocol::user_input::UserInput as CoreInputItem;
use codex_rmcp_client::perform_oauth_login_return_url;
use codex_rollout::EventPersistenceMode;
use codex_rollout::is_persisted_rollout_item;
use codex_rollout::state_db::StateDbHandle;
use codex_rollout::state_db::reconcile_rollout;
@@ -536,7 +534,7 @@ pub(crate) use self::thread_summary::thread_settings_from_core_snapshot;
pub(crate) fn build_api_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
let mut builder = ThreadHistoryBuilder::new();
for item in items {
if is_persisted_rollout_item(item, EventPersistenceMode::Limited) {
if is_persisted_rollout_item(item) {
builder.handle_rollout_item(item);
}
}
@@ -309,7 +309,6 @@ impl ExternalAgentConfigRequestProcessor {
session_source: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments,
@@ -5,10 +5,6 @@ use codex_protocol::models::BUILT_IN_PERMISSION_PROFILE_WORKSPACE;
const THREAD_LIST_DEFAULT_LIMIT: usize = 25;
const THREAD_LIST_MAX_LIMIT: usize = 100;
const PERSIST_EXTENDED_HISTORY_DEPRECATION_SUMMARY: &str =
"persistExtendedHistory is deprecated and ignored";
const PERSIST_EXTENDED_HISTORY_DEPRECATION_DETAILS: &str =
"Remove this parameter. App-server always uses limited history persistence.";
struct ThreadListFilters {
model_providers: Option<Vec<String>>,
@@ -849,17 +845,12 @@ impl ThreadRequestProcessor {
session_start_source,
thread_source,
environments,
persist_extended_history,
} = params;
if sandbox.is_some() && permissions.is_some() {
return Err(invalid_request(
"`permissions` cannot be combined with `sandbox`",
));
}
if persist_extended_history {
self.send_persist_extended_history_deprecation_notice(request_id.connection_id)
.await;
}
let environment_selections = self.parse_environment_selections(environments)?;
let mut typesafe_overrides = self.build_thread_config_overrides(
model,
@@ -952,18 +943,6 @@ impl ThreadRequestProcessor {
self.outgoing.request_trace_context(request_id).await
}
async fn send_persist_extended_history_deprecation_notice(&self, connection_id: ConnectionId) {
self.outgoing
.send_server_notification_to_connections(
&[connection_id],
ServerNotification::DeprecationNotice(DeprecationNoticeNotification {
summary: PERSIST_EXTENDED_HISTORY_DEPRECATION_SUMMARY.to_string(),
details: Some(PERSIST_EXTENDED_HISTORY_DEPRECATION_DETAILS.to_string()),
}),
)
.await;
}
async fn submit_core_op(
&self,
request_id: &ConnectionRequestId,
@@ -1106,7 +1085,6 @@ impl ThreadRequestProcessor {
session_source: None,
thread_source,
dynamic_tools: core_dynamic_tools,
persist_extended_history: false,
metrics_service_name: service_name,
parent_trace: request_trace,
environments,
@@ -1115,7 +1093,6 @@ impl ThreadRequestProcessor {
"app_server.thread_start.create_thread",
otel.name = "app_server.thread_start.create_thread",
thread_start.dynamic_tool_count = core_dynamic_tool_count,
thread_start.persist_extended_history = false,
))
.await
.map_err(|err| match err {
@@ -2446,10 +2423,6 @@ impl ThreadRequestProcessor {
.await;
return Ok(());
}
if params.persist_extended_history {
self.send_persist_extended_history_deprecation_notice(request_id.connection_id)
.await;
}
let redact_resume_payloads =
should_redact_thread_resume_payloads(app_server_client_name.as_deref());
@@ -2496,7 +2469,6 @@ impl ThreadRequestProcessor {
personality,
exclude_turns,
initial_turns_page,
persist_extended_history: _persist_extended_history,
} = params;
let include_turns = !exclude_turns;
@@ -2561,7 +2533,6 @@ impl ThreadRequestProcessor {
config.clone(),
thread_history,
self.auth_manager.clone(),
/*persist_extended_history*/ false,
self.request_trace_context(&request_id).await,
)
.await
@@ -3191,7 +3162,6 @@ impl ThreadRequestProcessor {
ephemeral,
thread_source,
exclude_turns,
persist_extended_history,
} = params;
let include_turns = !exclude_turns;
if sandbox.is_some() && permissions.is_some() {
@@ -3199,11 +3169,6 @@ impl ThreadRequestProcessor {
"`permissions` cannot be combined with `sandbox`",
));
}
if persist_extended_history {
self.send_persist_extended_history_deprecation_notice(request_id.connection_id)
.await;
}
let source_thread = self
.read_stored_thread_for_resume(&thread_id, path.as_ref(), /*include_history*/ true)
.await?;
@@ -3282,7 +3247,6 @@ impl ThreadRequestProcessor {
rollout_path: source_thread.rollout_path.clone(),
}),
thread_source.map(Into::into),
/*persist_extended_history*/ false,
self.request_trace_context(&request_id).await,
)
.await
@@ -656,7 +656,6 @@ mod thread_processor_behavior_tests {
personality: None,
exclude_turns: false,
initial_turns_page: None,
persist_extended_history: false,
};
let config_snapshot = ThreadConfigSnapshot {
model: "gpt-5".to_string(),
@@ -1096,7 +1096,6 @@ impl TurnRequestProcessor {
rollout_path: parent_thread.rollout_path(),
}),
/*thread_source*/ None,
/*persist_extended_history*/ false,
self.request_trace_context(request_id).await,
)
.await
@@ -26,7 +26,6 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_thread_store::CreateThreadParams;
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadPersistenceMetadata;
use codex_thread_store::ThreadStore;
use codex_utils_absolute_path::AbsolutePathBuf;
@@ -134,7 +133,6 @@ async fn get_conversation_summary_by_thread_id_reads_pathless_store_thread() ->
model_provider: "test-provider".to_string(),
memory_mode: ThreadMemoryMode::Disabled,
},
event_persistence_mode: ThreadEventPersistenceMode::default(),
})
.await?;
@@ -795,7 +795,6 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<(
environments: None,
mock_experimental_field: None,
experimental_raw_events: false,
persist_extended_history: false,
})
.await?;
let _: JSONRPCResponse = timeout(
@@ -59,7 +59,6 @@ use codex_protocol::user_input::TextElement;
use codex_thread_store::AppendThreadItemsParams;
use codex_thread_store::CreateThreadParams;
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadMetadataPatch;
use codex_thread_store::ThreadPersistenceMetadata;
use codex_thread_store::ThreadStore;
@@ -1368,7 +1367,6 @@ async fn seed_pathless_store_thread(
model_provider: "test-provider".to_string(),
memory_mode: ThreadMemoryMode::Disabled,
},
event_persistence_mode: ThreadEventPersistenceMode::default(),
})
.await?;
store
@@ -64,10 +64,7 @@ async fn thread_shell_command_history_responses_exclude_persisted_command_execut
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
persist_extended_history: true,
..Default::default()
})
.send_thread_start_request(ThreadStartParams::default())
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
@@ -259,10 +256,7 @@ async fn thread_shell_command_uses_existing_active_turn() -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
persist_extended_history: true,
..Default::default()
})
.send_thread_start_request(ThreadStartParams::default())
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
@@ -6,7 +6,6 @@ use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use codex_app_server_protocol::AskForApproval;
use codex_app_server_protocol::DeprecationNoticeNotification;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
@@ -52,46 +51,6 @@ use super::analytics::wait_for_analytics_payload;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
#[tokio::test]
async fn thread_start_deprecates_persist_extended_history_true() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
persist_extended_history: true,
..Default::default()
})
.await?;
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("deprecationNotice"),
)
.await??;
let notice: DeprecationNoticeNotification = serde_json::from_value(
notification
.params
.expect("deprecationNotice params should be present"),
)?;
assert_eq!(
notice.summary,
"persistExtendedHistory is deprecated and ignored"
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
Ok(())
}
#[tokio::test]
async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
// Provide a mock server and config so model wiring is valid.
@@ -35,7 +35,6 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_thread_store::CreateThreadParams;
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadMetadataPatch;
use codex_thread_store::ThreadPersistenceMetadata;
use codex_thread_store::ThreadStore;
@@ -221,7 +220,6 @@ async fn thread_unarchive_preserves_pathless_store_metadata() -> Result<()> {
model_provider: "test-provider".to_string(),
memory_mode: ThreadMemoryMode::Disabled,
},
event_persistence_mode: ThreadEventPersistenceMode::default(),
})
.await?;
store
-2
View File
@@ -270,7 +270,6 @@ impl AgentControl {
options.parent_thread_id,
/*forked_from_thread_id*/ None,
/*thread_source*/ Some(ThreadSource::Subagent),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
@@ -495,7 +494,6 @@ impl AgentControl {
/*thread_source*/ Some(ThreadSource::Subagent),
/*parent_thread_id*/ Some(parent_thread_id),
/*forked_from_thread_id*/ Some(parent_thread_id),
/*persist_extended_history*/ false,
inherited_shell_snapshot,
inherited_exec_policy,
options.environments.clone(),
-1
View File
@@ -93,7 +93,6 @@ pub(crate) async fn run_codex_thread_interactive(
thread_source: Some(ThreadSource::Subagent),
agent_control: parent_session.services.agent_control.clone(),
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
inherited_shell_snapshot: None,
user_shell_override: None,
-1
View File
@@ -144,7 +144,6 @@ mod turn_metadata;
mod turn_timing;
pub use rollout::ARCHIVED_SESSIONS_SUBDIR;
pub use rollout::Cursor;
pub use rollout::EventPersistenceMode;
pub use rollout::INTERACTIVE_SESSION_SOURCES;
pub use rollout::RolloutRecorder;
pub use rollout::RolloutRecorderParams;
-1
View File
@@ -1,7 +1,6 @@
use crate::config::Config;
pub use codex_rollout::ARCHIVED_SESSIONS_SUBDIR;
pub use codex_rollout::Cursor;
pub use codex_rollout::EventPersistenceMode;
pub use codex_rollout::INTERACTIVE_SESSION_SOURCES;
pub use codex_rollout::RolloutRecorder;
pub use codex_rollout::RolloutRecorderParams;
-4
View File
@@ -137,7 +137,6 @@ use codex_thread_store::LiveThreadInitGuard;
use codex_thread_store::LocalThreadStore;
use codex_thread_store::ReadThreadParams;
use codex_thread_store::ResumeThreadParams;
use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadPersistenceMetadata;
use codex_thread_store::ThreadStore;
use codex_utils_output_truncation::TruncationPolicy;
@@ -405,7 +404,6 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) thread_source: Option<ThreadSource>,
pub(crate) agent_control: AgentControl,
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
pub(crate) persist_extended_history: bool,
pub(crate) metrics_service_name: Option<String>,
pub(crate) inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
pub(crate) inherited_exec_policy: Option<Arc<ExecPolicyManager>>,
@@ -471,7 +469,6 @@ impl Codex {
thread_source,
agent_control,
dynamic_tools,
persist_extended_history,
metrics_service_name,
inherited_shell_snapshot,
user_shell_override,
@@ -599,7 +596,6 @@ impl Codex {
parent_thread_id,
thread_source,
dynamic_tools,
persist_extended_history,
inherited_shell_snapshot,
user_shell_override,
};
-8
View File
@@ -102,7 +102,6 @@ pub(crate) struct SessionConfiguration {
/// Optional analytics source classification for this thread.
pub(super) thread_source: Option<ThreadSource>,
pub(super) dynamic_tools: Vec<DynamicToolSpec>,
pub(super) persist_extended_history: bool,
pub(super) inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
pub(super) user_shell_override: Option<shell::Shell>,
}
@@ -519,11 +518,6 @@ impl Session {
.or_else(|| initial_history.get_resumed_parent_thread_id());
session_configuration.parent_thread_id = parent_thread_id;
let event_persistence_mode = if session_configuration.persist_extended_history {
ThreadEventPersistenceMode::Extended
} else {
ThreadEventPersistenceMode::Limited
};
let thread_id = match &initial_history {
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => {
ThreadId::default()
@@ -573,7 +567,6 @@ impl Session {
ThreadMemoryMode::Disabled
},
},
event_persistence_mode,
},
)
.await?
@@ -595,7 +588,6 @@ impl Session {
ThreadMemoryMode::Disabled
},
},
event_persistence_mode,
},
)
.await?
-11
View File
@@ -2397,7 +2397,6 @@ async fn fork_startup_context_then_first_turn_diff_snapshot() -> anyhow::Result<
fork_config.clone(),
rollout_path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await?;
@@ -3100,7 +3099,6 @@ async fn set_rate_limits_retains_previous_credits() {
parent_thread_id: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
};
@@ -3206,7 +3204,6 @@ async fn set_rate_limits_updates_plan_type_when_present() {
parent_thread_id: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
};
@@ -3466,7 +3463,6 @@ async fn attach_thread_persistence(session: &mut Session) -> PathBuf {
ThreadMemoryMode::Disabled
},
},
event_persistence_mode: ThreadEventPersistenceMode::Limited,
},
)
.await
@@ -3736,7 +3732,6 @@ pub(crate) async fn make_session_configuration_for_tests() -> SessionConfigurati
parent_thread_id: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
}
@@ -4481,7 +4476,6 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() {
parent_thread_id: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
};
@@ -4592,7 +4586,6 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
parent_thread_id: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
};
@@ -4829,7 +4822,6 @@ async fn make_session_with_config_and_rx(
parent_thread_id: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
};
@@ -4934,7 +4926,6 @@ async fn make_session_with_history_source_and_agent_control_and_rx(
parent_thread_id: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
};
@@ -5968,7 +5959,6 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() {
ThreadMemoryMode::Disabled
},
},
event_persistence_mode: ThreadEventPersistenceMode::Limited,
},
)
.await
@@ -6440,7 +6430,6 @@ where
parent_thread_id: None,
thread_source: None,
dynamic_tools,
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
};
@@ -697,7 +697,6 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
thread_source: None,
agent_control: AgentControl::default(),
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
inherited_shell_snapshot: None,
inherited_exec_policy: Some(Arc::new(parent_exec_policy)),
+3 -39
View File
@@ -177,7 +177,6 @@ pub struct StartThreadOptions {
pub session_source: Option<SessionSource>,
pub thread_source: Option<ThreadSource>,
pub dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
pub persist_extended_history: bool,
pub metrics_service_name: Option<String>,
pub parent_trace: Option<W3cTraceContext>,
pub environments: Vec<TurnEnvironmentSelection>,
@@ -554,19 +553,13 @@ impl ThreadManager {
pub async fn start_thread(&self, config: Config) -> CodexResult<NewThread> {
// Box delegated thread-spawn futures so these convenience wrappers do
// not inline the full spawn path into every caller's async state.
Box::pin(self.start_thread_with_tools(
config,
Vec::new(),
/*persist_extended_history*/ false,
))
.await
Box::pin(self.start_thread_with_tools(config, Vec::new())).await
}
pub async fn start_thread_with_tools(
&self,
config: Config,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
) -> CodexResult<NewThread> {
let environments = default_thread_environment_selections(
self.state.environment_manager.as_ref(),
@@ -578,7 +571,6 @@ impl ThreadManager {
session_source: None,
thread_source: None,
dynamic_tools,
persist_extended_history,
metrics_service_name: None,
parent_trace: None,
environments,
@@ -615,7 +607,6 @@ impl ThreadManager {
forked_from_thread_id,
thread_source,
options.dynamic_tools,
options.persist_extended_history,
options.metrics_service_name,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
@@ -669,7 +660,6 @@ impl ThreadManager {
config,
initial_history,
auth_manager,
/*persist_extended_history*/ false,
parent_trace,
))
.await
@@ -680,7 +670,6 @@ impl ThreadManager {
config: Config,
initial_history: InitialHistory,
auth_manager: Arc<AuthManager>,
persist_extended_history: bool,
parent_trace: Option<W3cTraceContext>,
) -> CodexResult<NewThread> {
let environments = default_thread_environment_selections(
@@ -700,7 +689,6 @@ impl ThreadManager {
/*forked_from_thread_id*/ None,
thread_source,
Vec::new(),
persist_extended_history,
/*metrics_service_name*/ None,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
@@ -729,7 +717,6 @@ impl ThreadManager {
/*forked_from_thread_id*/ None,
/*thread_source*/ None,
Vec::new(),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*parent_trace*/ None,
environments,
@@ -763,7 +750,6 @@ impl ThreadManager {
/*forked_from_thread_id*/ None,
thread_source,
Vec::new(),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
@@ -842,7 +828,6 @@ impl ThreadManager {
config: Config,
path: PathBuf,
thread_source: Option<ThreadSource>,
persist_extended_history: bool,
parent_trace: Option<W3cTraceContext>,
) -> CodexResult<NewThread>
where
@@ -850,15 +835,8 @@ impl ThreadManager {
{
let snapshot = snapshot.into();
let history = self.initial_history_from_rollout_path(path).await?;
self.fork_thread_from_history(
snapshot,
config,
history,
thread_source,
persist_extended_history,
parent_trace,
)
.await
self.fork_thread_from_history(snapshot, config, history, thread_source, parent_trace)
.await
}
async fn initial_history_from_rollout_path(
@@ -886,7 +864,6 @@ impl ThreadManager {
config: Config,
history: InitialHistory,
thread_source: Option<ThreadSource>,
persist_extended_history: bool,
parent_trace: Option<W3cTraceContext>,
) -> CodexResult<NewThread>
where
@@ -897,7 +874,6 @@ impl ThreadManager {
config,
history,
thread_source,
persist_extended_history,
parent_trace,
)
.await
@@ -909,7 +885,6 @@ impl ThreadManager {
config: Config,
history: InitialHistory,
thread_source: Option<ThreadSource>,
persist_extended_history: bool,
parent_trace: Option<W3cTraceContext>,
) -> CodexResult<NewThread> {
// `forked_from_id()` describes this history's existing lineage. When
@@ -934,7 +909,6 @@ impl ThreadManager {
forked_from_thread_id,
thread_source,
Vec::new(),
persist_extended_history,
/*metrics_service_name*/ None,
parent_trace,
environments,
@@ -1057,7 +1031,6 @@ impl ThreadManagerState {
/*parent_thread_id*/ None,
/*forked_from_thread_id*/ None,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
@@ -1075,7 +1048,6 @@ impl ThreadManagerState {
parent_thread_id: Option<ThreadId>,
forked_from_thread_id: Option<ThreadId>,
thread_source: Option<ThreadSource>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
@@ -1094,7 +1066,6 @@ impl ThreadManagerState {
forked_from_thread_id,
thread_source,
Vec::new(),
persist_extended_history,
metrics_service_name,
inherited_shell_snapshot,
inherited_exec_policy,
@@ -1131,7 +1102,6 @@ impl ThreadManagerState {
/*forked_from_thread_id*/ None,
thread_source,
Vec::new(),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
@@ -1152,7 +1122,6 @@ impl ThreadManagerState {
thread_source: Option<ThreadSource>,
parent_thread_id: Option<ThreadId>,
forked_from_thread_id: Option<ThreadId>,
persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
environments: Option<Vec<TurnEnvironmentSelection>>,
@@ -1170,7 +1139,6 @@ impl ThreadManagerState {
forked_from_thread_id,
thread_source,
Vec::new(),
persist_extended_history,
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
@@ -1193,7 +1161,6 @@ impl ThreadManagerState {
forked_from_thread_id: Option<ThreadId>,
thread_source: Option<ThreadSource>,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
parent_trace: Option<W3cTraceContext>,
environments: Vec<TurnEnvironmentSelection>,
@@ -1209,7 +1176,6 @@ impl ThreadManagerState {
forked_from_thread_id,
thread_source,
dynamic_tools,
persist_extended_history,
metrics_service_name,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
@@ -1232,7 +1198,6 @@ impl ThreadManagerState {
forked_from_thread_id: Option<ThreadId>,
thread_source: Option<ThreadSource>,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
@@ -1287,7 +1252,6 @@ impl ThreadManagerState {
thread_source,
agent_control,
dynamic_tools,
persist_extended_history,
metrics_service_name,
inherited_shell_snapshot,
inherited_exec_policy,
-16
View File
@@ -172,7 +172,6 @@ fn fork_thread_accepts_legacy_usize_snapshot_argument() {
config,
path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
);
}
@@ -328,7 +327,6 @@ async fn start_thread_rejects_explicit_local_environment_when_default_provider_i
session_source: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: vec![TurnEnvironmentSelection {
@@ -462,7 +460,6 @@ async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() {
)),
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: Vec::new(),
@@ -518,7 +515,6 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
session_source: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: environments.clone(),
@@ -574,7 +570,6 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
config,
rollout_path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -790,7 +785,6 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() {
session_source: None,
thread_source: Some(ThreadSource::User),
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: Vec::new(),
@@ -897,7 +891,6 @@ async fn rollout_path_resume_and_fork_read_history_through_thread_store() {
rollout_path: Some(rollout_path.clone()),
}),
auth_manager.clone(),
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -926,7 +919,6 @@ async fn rollout_path_resume_and_fork_read_history_through_thread_store() {
config,
rollout_path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -1204,7 +1196,6 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
RolloutItem::ResponseItem(assistant_msg("partial")),
]),
auth_manager,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -1227,7 +1218,6 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
config.clone(),
source_path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -1319,7 +1309,6 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
RolloutItem::ResponseItem(assistant_msg("partial")),
]),
auth_manager,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -1347,7 +1336,6 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
config.clone(),
source_path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -1410,7 +1398,6 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
RolloutItem::ResponseItem(assistant_msg("partial")),
]),
auth_manager,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -1431,7 +1418,6 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
config.clone(),
source_path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -1472,7 +1458,6 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
config.clone(),
forked_path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -1550,7 +1535,6 @@ async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> {
config.clone(),
InitialHistory::Forked(vec![RolloutItem::ResponseItem(user_msg("keep working"))]),
auth_manager.clone(),
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -2707,7 +2707,6 @@ async fn resume_agent_restores_closed_agent_and_accepts_send_input() {
phase: None,
})]),
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")),
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
-1
View File
@@ -3032,7 +3032,6 @@ async fn code_mode_can_call_hidden_dynamic_tools() -> Result<()> {
}),
defer_loading: true,
}],
/*persist_extended_history*/ false,
)
.await?;
let mut test = base_test;
+1 -5
View File
@@ -1154,11 +1154,7 @@ async fn remote_compact_filters_deferred_dynamic_tools() -> Result<()> {
];
let new_thread = test
.thread_manager
.start_thread_with_tools(
test.config.clone(),
dynamic_tools,
/*persist_extended_history*/ false,
)
.start_thread_with_tools(test.config.clone(), dynamic_tools)
.await?;
test.codex = new_thread.thread;
test.session_configured = new_thread.session_configured;
@@ -850,7 +850,6 @@ async fn fork_thread(
config.clone(),
path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
))
.await
-3
View File
@@ -104,7 +104,6 @@ async fn fork_thread_twice_drops_to_first_message() {
config_for_fork.clone(),
base_path.clone(),
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -129,7 +128,6 @@ async fn fork_thread_twice_drops_to_first_message() {
config_for_fork.clone(),
fork1_path.clone(),
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -204,7 +202,6 @@ async fn fork_thread_from_history_does_not_require_source_rollout_path() {
rollout_path: None,
}),
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -490,7 +490,6 @@ async fn resume_and_fork_append_permissions_messages() -> Result<()> {
fork_config.clone(),
rollout_path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await?;
@@ -1886,7 +1886,6 @@ async fn conversation_startup_context_current_thread_selects_many_turns_by_budge
test.config.clone(),
InitialHistory::Forked(history),
auth_manager_from_auth(CodexAuth::from_api_key("dummy")),
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await?;
@@ -107,7 +107,6 @@ async fn emits_warning_when_resumed_model_differs() {
config.clone(),
initial_history,
auth_manager,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
+2 -10
View File
@@ -926,11 +926,7 @@ async fn tool_search_returns_deferred_dynamic_tool_and_routes_follow_up_call() -
let base_test = builder.build(&server).await?;
let new_thread = base_test
.thread_manager
.start_thread_with_tools(
base_test.config.clone(),
vec![dynamic_tool],
/*persist_extended_history*/ false,
)
.start_thread_with_tools(base_test.config.clone(), vec![dynamic_tool])
.await?;
let mut test = base_test;
test.codex = new_thread.thread;
@@ -1559,11 +1555,7 @@ async fn tool_search_matches_dynamic_tools_by_name_description_namespace_and_sch
let base_test = builder.build(&server).await?;
let new_thread = base_test
.thread_manager
.start_thread_with_tools(
base_test.config.clone(),
vec![dynamic_tool],
/*persist_extended_history*/ false,
)
.start_thread_with_tools(base_test.config.clone(), vec![dynamic_tool])
.await?;
let mut test = base_test;
test.codex = new_thread.thread;
+1 -5
View File
@@ -127,11 +127,7 @@ async fn resume_restores_dynamic_tools_from_rollout_with_sqlite_enabled() -> Res
let base_test = builder.build(&server).await?;
let started = base_test
.thread_manager
.start_thread_with_tools(
base_test.config.clone(),
vec![dynamic_tool.clone()],
/*persist_extended_history*/ false,
)
.start_thread_with_tools(base_test.config.clone(), vec![dynamic_tool.clone()])
.await?;
let rollout_path = started
.session_configured
@@ -752,7 +752,6 @@ async fn subagent_stop_replaces_stop_and_skips_internal_subagents() -> Result<()
session_source: Some(SessionSource::SubAgent(SubAgentSource::Review)),
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: Vec::new(),
@@ -46,7 +46,6 @@ async fn emits_warning_when_unstable_features_enabled_via_config() {
config.clone(),
InitialHistory::New,
auth_manager,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -93,7 +92,6 @@ async fn suppresses_warning_when_configured() {
config.clone(),
InitialHistory::New,
auth_manager,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
@@ -73,7 +73,6 @@ async fn window_id_advances_after_compact_persists_on_resume_and_resets_on_fork(
resumed.config.clone(),
rollout_path,
/*thread_source*/ None,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await?;
-1
View File
@@ -248,7 +248,6 @@ impl MemoryStartupContext {
)),
thread_source: Some(ThreadSource::MemoryConsolidation),
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments,
-1
View File
@@ -23,7 +23,6 @@ codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-state = { workspace = true }
codex-utils-path = { workspace = true }
codex-utils-string = { workspace = true }
regex = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
-1
View File
@@ -60,7 +60,6 @@ pub use list::read_session_meta_line;
pub use list::read_thread_item_from_rollout;
pub use list::rollout_date_parts;
pub use metadata::builder_from_items;
pub use policy::EventPersistenceMode;
pub use policy::is_persisted_rollout_item;
pub use policy::persisted_rollout_items;
pub use policy::should_persist_response_item_for_memories;
+13 -78
View File
@@ -1,23 +1,12 @@
use crate::protocol::EventMsg;
use crate::protocol::RolloutItem;
use codex_protocol::models::ResponseItem;
use codex_utils_string::truncate_middle_chars;
const PERSISTED_EXEC_AGGREGATED_OUTPUT_MAX_BYTES: usize = 10_000;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum EventPersistenceMode {
#[default]
Limited,
Extended,
}
/// Whether a rollout `item` should be persisted in rollout files for the
/// provided persistence `mode`.
pub fn is_persisted_rollout_item(item: &RolloutItem, mode: EventPersistenceMode) -> bool {
/// Whether a rollout `item` should be persisted in rollout files.
pub fn is_persisted_rollout_item(item: &RolloutItem) -> bool {
match item {
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode),
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev),
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
true
@@ -26,42 +15,16 @@ pub fn is_persisted_rollout_item(item: &RolloutItem, mode: EventPersistenceMode)
}
/// Return the canonical rollout items that should be persisted for a live append.
pub fn persisted_rollout_items(
items: &[RolloutItem],
mode: EventPersistenceMode,
) -> Vec<RolloutItem> {
pub fn persisted_rollout_items(items: &[RolloutItem]) -> Vec<RolloutItem> {
let mut persisted = Vec::new();
for item in items {
if is_persisted_rollout_item(item, mode) {
persisted.push(sanitize_rollout_item_for_persistence(item.clone(), mode));
if is_persisted_rollout_item(item) {
persisted.push(item.clone());
}
}
persisted
}
fn sanitize_rollout_item_for_persistence(
item: RolloutItem,
mode: EventPersistenceMode,
) -> RolloutItem {
if mode != EventPersistenceMode::Extended {
return item;
}
match item {
RolloutItem::EventMsg(EventMsg::ExecCommandEnd(mut event)) => {
event.aggregated_output = truncate_middle_chars(
&event.aggregated_output,
PERSISTED_EXEC_AGGREGATED_OUTPUT_MAX_BYTES,
);
event.stdout.clear();
event.stderr.clear();
event.formatted_output.clear();
RolloutItem::EventMsg(EventMsg::ExecCommandEnd(event))
}
_ => item,
}
}
/// Whether a `ResponseItem` should be persisted in rollout files.
#[inline]
pub fn should_persist_response_item(item: &ResponseItem) -> bool {
@@ -106,33 +69,9 @@ pub fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool {
}
}
/// Whether an `EventMsg` should be persisted in rollout files for the
/// provided persistence `mode`.
/// Whether an `EventMsg` should be persisted in rollout files.
#[inline]
pub fn should_persist_event_msg(ev: &EventMsg, mode: EventPersistenceMode) -> bool {
match mode {
EventPersistenceMode::Limited => should_persist_event_msg_limited(ev),
EventPersistenceMode::Extended => should_persist_event_msg_extended(ev),
}
}
fn should_persist_event_msg_limited(ev: &EventMsg) -> bool {
matches!(
event_msg_persistence_mode(ev),
Some(EventPersistenceMode::Limited)
)
}
fn should_persist_event_msg_extended(ev: &EventMsg) -> bool {
matches!(
event_msg_persistence_mode(ev),
Some(EventPersistenceMode::Limited) | Some(EventPersistenceMode::Extended)
)
}
/// Returns the minimum persistence mode that includes this event.
/// `None` means the event should never be persisted.
fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
pub fn should_persist_event_msg(ev: &EventMsg) -> bool {
match ev {
EventMsg::UserMessage(_)
| EventMsg::AgentMessage(_)
@@ -150,16 +89,12 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
| EventMsg::TurnStarted(_)
| EventMsg::TurnComplete(_)
| EventMsg::WebSearchEnd(_)
| EventMsg::ImageGenerationEnd(_) => Some(EventPersistenceMode::Limited),
| EventMsg::ImageGenerationEnd(_) => true,
EventMsg::ItemCompleted(event) => {
// Plan items are derived from streaming tags and are not part of the
// raw ResponseItem history, so we persist their completion to replay
// them on resume without bloating rollouts with every item lifecycle.
if matches!(event.item, codex_protocol::items::TurnItem::Plan(_)) {
Some(EventPersistenceMode::Limited)
} else {
None
}
matches!(event.item, codex_protocol::items::TurnItem::Plan(_))
}
EventMsg::Error(_)
| EventMsg::GuardianAssessment(_)
@@ -171,8 +106,8 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
| EventMsg::CollabCloseEnd(_)
| EventMsg::CollabResumeEnd(_)
| EventMsg::DynamicToolCallRequest(_)
| EventMsg::DynamicToolCallResponse(_) => Some(EventPersistenceMode::Extended),
EventMsg::Warning(_)
| EventMsg::DynamicToolCallResponse(_)
| EventMsg::Warning(_)
| EventMsg::GuardianWarning(_)
| EventMsg::RealtimeConversationStarted(_)
| EventMsg::RealtimeConversationSdp(_)
@@ -216,6 +151,6 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
| EventMsg::CollabAgentInteractionBegin(_)
| EventMsg::CollabWaitingBegin(_)
| EventMsg::CollabCloseBegin(_)
| EventMsg::CollabResumeBegin(_) => None,
| EventMsg::CollabResumeBegin(_) => false,
}
}
-1
View File
@@ -43,7 +43,6 @@ pub use types::StoredTurn;
pub use types::StoredTurnError;
pub use types::StoredTurnItemsView;
pub use types::StoredTurnStatus;
pub use types::ThreadEventPersistenceMode;
pub use types::ThreadMetadataPatch;
pub use types::ThreadPage;
pub use types::ThreadPersistenceMetadata;
+1 -15
View File
@@ -4,7 +4,6 @@ use std::sync::Arc;
use codex_protocol::ThreadId;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_rollout::EventPersistenceMode;
use codex_rollout::persisted_rollout_items;
use tokio::sync::Mutex;
use tracing::warn;
@@ -17,7 +16,6 @@ use crate::ReadThreadParams;
use crate::ResumeThreadParams;
use crate::StoredThread;
use crate::StoredThreadHistory;
use crate::ThreadEventPersistenceMode;
use crate::ThreadMetadataPatch;
use crate::ThreadStore;
use crate::ThreadStoreResult;
@@ -33,7 +31,6 @@ use crate::thread_metadata_sync::ThreadMetadataSync;
pub struct LiveThread {
thread_id: ThreadId,
thread_store: Arc<dyn ThreadStore>,
event_persistence_mode: EventPersistenceMode,
metadata_sync: Arc<Mutex<ThreadMetadataSync>>,
}
@@ -92,13 +89,11 @@ impl LiveThread {
params: CreateThreadParams,
) -> ThreadStoreResult<Self> {
let thread_id = params.thread_id;
let event_persistence_mode = event_persistence_mode(params.event_persistence_mode);
let metadata_sync = ThreadMetadataSync::for_create(&params).await;
thread_store.create_thread(params).await?;
Ok(Self {
thread_id,
thread_store,
event_persistence_mode,
metadata_sync: Arc::new(Mutex::new(metadata_sync)),
})
}
@@ -108,7 +103,6 @@ impl LiveThread {
mut params: ResumeThreadParams,
) -> ThreadStoreResult<Self> {
let thread_id = params.thread_id;
let event_persistence_mode = event_persistence_mode(params.event_persistence_mode);
let should_load_history = params.history.is_none();
let include_archived = params.include_archived;
thread_store.resume_thread(params.clone()).await?;
@@ -131,13 +125,12 @@ impl LiveThread {
Ok(Self {
thread_id,
thread_store,
event_persistence_mode,
metadata_sync: Arc::new(Mutex::new(metadata_sync)),
})
}
pub async fn append_items(&self, items: &[RolloutItem]) -> ThreadStoreResult<()> {
let canonical_items = persisted_rollout_items(items, self.event_persistence_mode);
let canonical_items = persisted_rollout_items(items);
if canonical_items.is_empty() {
return Ok(());
}
@@ -301,10 +294,3 @@ impl LiveThread {
Ok(())
}
}
fn event_persistence_mode(mode: ThreadEventPersistenceMode) -> EventPersistenceMode {
match mode {
ThreadEventPersistenceMode::Limited => EventPersistenceMode::Limited,
ThreadEventPersistenceMode::Extended => EventPersistenceMode::Extended,
}
}
-10
View File
@@ -305,7 +305,6 @@ mod tests {
use super::*;
use crate::LiveThread;
use crate::ThreadEventPersistenceMode;
use crate::ThreadPersistenceMetadata;
use crate::local::test_support::test_config;
use crate::local::test_support::write_archived_session_file;
@@ -539,7 +538,6 @@ mod tests {
model_provider: "different-provider".to_string(),
memory_mode: ThreadMemoryMode::Enabled,
},
event_persistence_mode: ThreadEventPersistenceMode::Limited,
},
)
.await
@@ -594,7 +592,6 @@ mod tests {
model_provider: "different-provider".to_string(),
memory_mode: ThreadMemoryMode::Enabled,
},
event_persistence_mode: ThreadEventPersistenceMode::Limited,
},
)
.await
@@ -720,7 +717,6 @@ mod tests {
history: None,
include_archived: true,
metadata: thread_metadata(),
event_persistence_mode: ThreadEventPersistenceMode::Limited,
})
.await
.expect("resume live thread");
@@ -781,7 +777,6 @@ mod tests {
history: None,
include_archived: true,
metadata: thread_metadata(),
event_persistence_mode: ThreadEventPersistenceMode::Limited,
})
.await
.expect_err("duplicate live resume should fail");
@@ -808,7 +803,6 @@ mod tests {
model_provider: "test-provider".to_string(),
memory_mode: ThreadMemoryMode::Enabled,
},
event_persistence_mode: ThreadEventPersistenceMode::Limited,
})
.await
.expect_err("missing cwd should fail");
@@ -834,7 +828,6 @@ mod tests {
history: None,
include_archived: true,
metadata: thread_metadata(),
event_persistence_mode: ThreadEventPersistenceMode::Limited,
})
.await
.expect("resume live thread");
@@ -883,7 +876,6 @@ mod tests {
history: None,
include_archived: true,
metadata: thread_metadata(),
event_persistence_mode: ThreadEventPersistenceMode::Limited,
})
.await
.expect("resume live thread");
@@ -922,7 +914,6 @@ mod tests {
history: None,
include_archived: true,
metadata: thread_metadata(),
event_persistence_mode: ThreadEventPersistenceMode::Limited,
})
.await
.expect("resume live archived thread");
@@ -1029,7 +1020,6 @@ mod tests {
base_instructions: BaseInstructions::default(),
dynamic_tools: Vec::new(),
metadata: thread_metadata(),
event_persistence_mode: ThreadEventPersistenceMode::Limited,
}
}
@@ -625,7 +625,6 @@ mod tests {
use crate::ListThreadsParams;
use crate::ResumeThreadParams;
use crate::SortDirection;
use crate::ThreadEventPersistenceMode;
use crate::ThreadMetadataPatch;
use crate::ThreadPersistenceMetadata;
use crate::ThreadSortKey;
@@ -788,7 +787,6 @@ mod tests {
history: None,
include_archived: true,
metadata: test_thread_metadata(),
event_persistence_mode: ThreadEventPersistenceMode::Limited,
})
.await
.expect("resume external live thread");
@@ -1590,7 +1588,6 @@ mod tests {
history: None,
include_archived: true,
metadata: test_thread_metadata(),
event_persistence_mode: ThreadEventPersistenceMode::Limited,
})
.await
.expect("resume archived live thread");
@@ -382,7 +382,6 @@ mod tests {
use pretty_assertions::assert_eq;
use super::*;
use crate::ThreadEventPersistenceMode;
use crate::ThreadPersistenceMetadata;
#[test]
@@ -529,7 +528,6 @@ mod tests {
model_provider: "test-provider".to_string(),
memory_mode: ThreadMemoryMode::Enabled,
},
event_persistence_mode: ThreadEventPersistenceMode::Limited,
}
}
-14
View File
@@ -42,16 +42,6 @@ mod optional_option {
}
}
/// Controls how many event variants should be persisted for future replay.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub enum ThreadEventPersistenceMode {
/// Persist only the legacy minimal replay surface.
#[default]
Limited,
/// Persist the richer event surface used by app-server history reconstruction.
Extended,
}
/// Thread-scoped metadata used when opening live persistence.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ThreadPersistenceMetadata {
@@ -84,8 +74,6 @@ pub struct CreateThreadParams {
pub dynamic_tools: Vec<DynamicToolSpec>,
/// Metadata captured for the newly created thread.
pub metadata: ThreadPersistenceMetadata,
/// Whether persistence should include the extended event surface.
pub event_persistence_mode: ThreadEventPersistenceMode,
}
/// Parameters required to reopen persistence for an existing thread.
@@ -101,8 +89,6 @@ pub struct ResumeThreadParams {
pub include_archived: bool,
/// Metadata for future writes appended to the resumed live thread.
pub metadata: ThreadPersistenceMetadata,
/// Whether persistence should include the extended event surface.
pub event_persistence_mode: ThreadEventPersistenceMode,
}
/// Parameters for appending rollout items to a live thread.
-3
View File
@@ -1418,7 +1418,6 @@ fn thread_start_params_from_config(
ephemeral: Some(config.ephemeral),
session_start_source,
thread_source: Some(ThreadSource::User),
persist_extended_history: false,
..ThreadStartParams::default()
}
}
@@ -1457,7 +1456,6 @@ fn thread_resume_params_from_config(
sandbox,
permissions,
config: config_request_overrides_from_config(&config),
persist_extended_history: false,
..ThreadResumeParams::default()
}
}
@@ -1500,7 +1498,6 @@ fn thread_fork_params_from_config(
developer_instructions: config.developer_instructions.clone(),
ephemeral: config.ephemeral,
thread_source: Some(ThreadSource::User),
persist_extended_history: false,
..ThreadForkParams::default()
}
}