diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 8c1d700ef..ca247f2d5 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3673,6 +3673,13 @@ } ], "description": "Optional client-supplied analytics source classification for this forked thread." + }, + "lastTurnId": { + "description": "Optional last turn id to fork through, inclusive.\n\nWhen specified, turns after `last_turn_id` are omitted from the fork. The referenced turn cannot be in progress.", + "type": [ + "string", + "null" + ] } }, "required": [ @@ -6866,4 +6873,4 @@ } ], "title": "ClientRequest" -} \ No newline at end of file +} diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index b972de220..9f03be5b7 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -17287,6 +17287,13 @@ } ], "description": "Optional client-supplied analytics source classification for this forked thread." + }, + "lastTurnId": { + "description": "Optional last turn id to fork through, inclusive.\n\nWhen specified, turns after `last_turn_id` are omitted from the fork. The referenced turn cannot be in progress.", + "type": [ + "string", + "null" + ] } }, "required": [ @@ -20833,4 +20840,4 @@ }, "title": "CodexAppServerProtocol", "type": "object" -} \ No newline at end of file +} diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 4af580e9c..f7037ef83 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -15066,6 +15066,13 @@ } ], "description": "Optional client-supplied analytics source classification for this forked thread." + }, + "lastTurnId": { + "description": "Optional last turn id to fork through, inclusive.\n\nWhen specified, turns after `last_turn_id` are omitted from the fork. The referenced turn cannot be in progress.", + "type": [ + "string", + "null" + ] } }, "required": [ @@ -18611,4 +18618,4 @@ }, "title": "CodexAppServerProtocolV2", "type": "object" -} \ No newline at end of file +} diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json index ad0174cfe..7a00e3803 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json @@ -168,6 +168,13 @@ } ], "description": "Optional client-supplied analytics source classification for this forked thread." + }, + "lastTurnId": { + "description": "Optional last turn id to fork through, inclusive.\n\nWhen specified, turns after `last_turn_id` are omitted from the fork. The referenced turn cannot be in progress.", + "type": [ + "string", + "null" + ] } }, "required": [ @@ -175,4 +182,4 @@ ], "title": "ThreadForkParams", "type": "object" -} \ No newline at end of file +} diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts index c5109b2c7..3ace4d441 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts @@ -18,6 +18,12 @@ import type { ThreadSource } from "./ThreadSource"; * Prefer using thread_id whenever possible. */ export type ThreadForkParams = {threadId: string, /** + * Optional last turn id to fork through, inclusive. + * + * When specified, turns after `last_turn_id` are omitted from the fork. + * The referenced turn cannot be in progress. + */ +lastTurnId?: string | null, /** * Configuration overrides for the forked thread, if any. */ model?: string | null, modelProvider?: string | null, serviceTier?: string | null | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, /** diff --git a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs index ea1e0424d..930441c63 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs @@ -839,6 +839,30 @@ fn thread_path_params_deserialize_empty_path_as_none() { ); } +#[test] +fn thread_fork_last_turn_id_round_trips() { + let params: ThreadForkParams = serde_json::from_value(json!({ + "threadId": "thread-1", + "lastTurnId": "turn-2", + })) + .expect("thread/fork params deserialize"); + + assert_eq!(params.last_turn_id, Some("turn-2".to_string())); + let serialized = serde_json::to_value(params).expect("thread/fork params serialize"); + assert_eq!(serialized["lastTurnId"], json!("turn-2")); + + let omitted = serde_json::to_value(ThreadForkParams { + thread_id: "thread-1".to_string(), + ..Default::default() + }) + .expect("thread/fork params without last turn id serialize"); + assert_eq!( + omitted["lastTurnId"], + serde_json::Value::Null, + "optional lastTurnId should serialize as null when omitted" + ); +} + #[test] fn fs_get_metadata_response_round_trips_minimal_fields() { let response = FsGetMetadataResponse { diff --git a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs index 73335fce5..7cd03930d 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/thread.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/thread.rs @@ -494,6 +494,13 @@ impl From for TurnsPage { pub struct ThreadForkParams { pub thread_id: String, + /// Optional last turn id to fork through, inclusive. + /// + /// When specified, turns after `last_turn_id` are omitted from the fork. + /// The referenced turn cannot be in progress. + #[ts(optional = nullable)] + pub last_turn_id: Option, + /// [UNSTABLE] Specify the rollout path to fork from. /// If specified, the thread_id param will be ignored. #[experimental("thread/fork.path")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 78f46f5b1..6f34cae54 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -139,7 +139,7 @@ Example with notification opt-out: - `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread. When the request includes a `cwd` and the resolved sandbox is `workspace-write` or full access, app-server also marks that project as trusted in the user `config.toml`. Pass `sessionStartSource: "clear"` when starting a replacement thread after clearing the current session so `SessionStart` hooks receive `source: "clear"` instead of the default `"startup"`. Experimental `allowProviderModelFallback` lets providers backed by an authoritative static model catalog replace an unavailable requested `model` with the catalog default; dynamic or cached catalogs preserve the requested model. Experimental `runtimeWorkspaceRoots` replaces the thread-scoped runtime workspace roots used to materialize `:workspace_roots`; paths must be absolute. For permissions, prefer experimental `permissions` profile selection by id; the legacy `sandbox` shorthand is still accepted but cannot be combined with `permissions`. Deprecated experimental `multiAgentMode` is ignored; use Ultra reasoning effort for proactive multi-agent behavior. Experimental `environments` selects the sticky execution environments for turns on the thread; omit it to use the server default, pass `[]` to disable environments, or pass explicit environment ids with per-environment `cwd`. Experimental `selectedCapabilityRoots` selects environment-owned plugin or standalone-skill roots using environment-native absolute paths. Skills found below those roots are listed and read through the owning environment. Stdio MCP servers declared by selected plugins are started in that environment, and HTTP MCP connections use that environment's HTTP client. - `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. Accepts the same permission override rules as `thread/start`. -- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`. +- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; pass an optional `lastTurnId` to copy history only through that turn, inclusive, and drop later turns from the fork. An in-progress `lastTurnId` is rejected. If `lastTurnId` is null while the source thread is mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`. - `thread/start`, `thread/resume`, and `thread/fork` responses include the legacy `sandbox` compatibility projection. `instructionSources` lists loaded instruction files using each source environment's native absolute path syntax, including files loaded from remote environments. Experimental clients can read `runtimeWorkspaceRoots` for the thread-scoped runtime roots and `activePermissionProfile` for the named or implicit built-in profile identity/provenance when known. Their deprecated experimental `multiAgentMode` field, and the corresponding thread setting, always report `explicitRequestOnly`; Ultra reasoning effort is the source of proactive multi-agent behavior. - `thread/list` — page through stored threads; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Experimental clients can use `parentThreadId` for direct spawned children or `ancestorThreadId` for spawned descendants at any depth; the two filters are mutually exclusive. Review and Guardian threads are not included because they do not participate in that spawn-edge lifecycle. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. Subagent threads also include `parentThreadId` when the immediate parent is known. - `thread/loaded/list` — list the thread ids currently loaded in memory. diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 39c4b79dc..cb0f276d8 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -330,6 +330,7 @@ use codex_core::path_utils; #[cfg(test)] use codex_core::read_head_for_summary; use codex_core::sandboxing::SandboxPermissions; +use codex_core::truncate_rollout_after_turn_id; use codex_core::windows_sandbox::WindowsSandboxLevelExt; use codex_core::windows_sandbox::WindowsSandboxSetupMode as CoreWindowsSandboxSetupMode; use codex_core::windows_sandbox::WindowsSandboxSetupRequest; diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index d5302d9c4..b78c53a38 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -3387,6 +3387,7 @@ impl ThreadRequestProcessor { ) -> Result<(), JSONRPCErrorError> { let ThreadForkParams { thread_id, + last_turn_id, path, model, model_provider, @@ -3421,12 +3422,20 @@ impl ThreadRequestProcessor { let history_items = source_thread .history .take() - .map(|history| Arc::new(history.items)) + .map(|history| history.items) .ok_or_else(|| { internal_error(format!( "thread {source_thread_id} did not include persisted history" )) })?; + let history_items = if let Some(last_turn_id) = last_turn_id.as_deref() { + Arc::new( + truncate_rollout_after_turn_id(&history_items, last_turn_id) + .map_err(|err| core_thread_write_error("truncate thread for fork", err))?, + ) + } else { + Arc::new(history_items) + }; let history_cwd = Some(source_thread.cwd.clone()); // Persist Windows sandbox mode. diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index 4102b45d1..d7c1707a3 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -251,6 +251,119 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_fork_at_last_turn_id_keeps_only_terminal_prefix() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = TestAppServer::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams::default()) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { + thread: source_thread, + .. + } = to_response::(start_resp)?; + let source_thread_id = source_thread.id.clone(); + let source_path = source_thread.path.expect("source thread path"); + + let mut turn_ids = Vec::new(); + for text in ["first", "second", "third"] { + let turn_request_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: source_thread_id.clone(), + client_user_message_id: None, + input: vec![UserInput::Text { + text: text.to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_request_id)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + turn_ids.push(turn.id); + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + } + + let original_contents = std::fs::read_to_string(source_path.as_path())?; + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: source_thread_id.clone(), + last_turn_id: Some(turn_ids[1].clone()), + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { + thread: forked_thread, + .. + } = to_response::(fork_resp)?; + + assert_eq!( + forked_thread + .turns + .iter() + .map(|turn| turn.id.clone()) + .collect::>(), + turn_ids[..2] + ); + assert!( + forked_thread + .turns + .iter() + .all(|turn| turn.status == TurnStatus::Completed) + ); + assert_eq!(forked_thread.forked_from_id, Some(source_thread_id)); + assert_eq!(forked_thread.preview, "first"); + assert_eq!( + std::fs::read_to_string(source_path.as_path())?, + original_contents, + "forking at a turn must not mutate the source rollout" + ); + + let forked_path = forked_thread.path.clone().expect("forked thread path"); + let forked_contents = std::fs::read_to_string(forked_path.as_path())?; + assert!(forked_contents.contains(turn_ids[1].as_str())); + assert!(!forked_contents.contains(turn_ids[2].as_str())); + + let started = loop { + let notification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/started"), + ) + .await??; + let started: ThreadStartedNotification = + serde_json::from_value(notification.params.expect("params must be present"))?; + if started.thread.id == forked_thread.id { + break started; + } + }; + assert!(started.thread.turns.is_empty()); + + Ok(()) +} + #[tokio::test] async fn thread_fork_inherits_explicit_source_name_from_session_index() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index ad5f54123..e9b749fea 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -145,6 +145,7 @@ pub(crate) mod state_db_bridge; pub use state_db_bridge::StateDbHandle; pub use state_db_bridge::init_state_db; mod thread_rollout_truncation; +pub use thread_rollout_truncation::truncate_rollout_after_turn_id; mod tools; pub(crate) mod turn_diff_tracker; mod turn_metadata; diff --git a/codex-rs/core/src/thread_rollout_truncation.rs b/codex-rs/core/src/thread_rollout_truncation.rs index 25263b14b..d7d6ad998 100644 --- a/codex-rs/core/src/thread_rollout_truncation.rs +++ b/codex-rs/core/src/thread_rollout_truncation.rs @@ -5,6 +5,10 @@ use crate::context_manager::is_user_turn_boundary; use crate::event_mapping; +use codex_app_server_protocol::TurnStatus; +use codex_app_server_protocol::build_turns_from_rollout_items; +use codex_protocol::error::CodexErr; +use codex_protocol::error::Result as CodexResult; use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::EventMsg; @@ -149,6 +153,58 @@ pub(crate) fn truncate_rollout_before_nth_user_message_from_start( items[..cut_idx].to_vec() } +/// Return a rollout prefix ending after the requested persisted terminal turn. +/// +/// The turn must still be present in the effective post-rollback history and +/// must have an explicit persisted TurnStarted boundary. Synthetic IDs +/// generated while projecting legacy rollouts are intentionally unsupported +/// because they do not provide a stable raw rollout boundary for a fork. +pub fn truncate_rollout_after_turn_id( + items: &[RolloutItem], + last_turn_id: &str, +) -> CodexResult> { + let turns = build_turns_from_rollout_items(items); + let turn = turns + .iter() + .find(|turn| turn.id == last_turn_id) + .ok_or_else(|| { + CodexErr::InvalidRequest(format!( + "lastTurnId '{last_turn_id}' was not found in the source thread" + )) + })?; + + let target_start_index = items + .iter() + .position(|item| { + matches!( + item, + RolloutItem::EventMsg(EventMsg::TurnStarted(event)) + if event.turn_id == last_turn_id + ) + }) + .ok_or_else(|| { + CodexErr::InvalidRequest(format!( + "lastTurnId '{last_turn_id}' is not a persisted canonical turn in the source thread" + )) + })?; + + if matches!(turn.status, TurnStatus::InProgress) { + return Err(CodexErr::InvalidRequest(format!( + "lastTurnId '{last_turn_id}' identifies an in-progress turn" + ))); + } + + let cut_index = items + .iter() + .enumerate() + .skip(target_start_index.saturating_add(1)) + .find_map(|(index, item)| { + matches!(item, RolloutItem::EventMsg(EventMsg::TurnStarted(_))).then_some(index) + }) + .unwrap_or(items.len()); + Ok(items[..cut_index].to_vec()) +} + /// Return a suffix of `items` that keeps the last `n_from_end` fork turns. /// /// If fewer than or equal to `n_from_end` fork turns exist, this keeps from the first fork-turn diff --git a/codex-rs/core/src/thread_rollout_truncation_tests.rs b/codex-rs/core/src/thread_rollout_truncation_tests.rs index 0da9763fc..16d10a827 100644 --- a/codex-rs/core/src/thread_rollout_truncation_tests.rs +++ b/codex-rs/core/src/thread_rollout_truncation_tests.rs @@ -6,6 +6,9 @@ use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::protocol::InterAgentCommunication; use codex_protocol::protocol::ThreadRolledBackEvent; +use codex_protocol::protocol::TurnCompleteEvent; +use codex_protocol::protocol::TurnStartedEvent; +use codex_protocol::protocol::UserMessageEvent; use pretty_assertions::assert_eq; use std::sync::Arc; @@ -66,6 +69,104 @@ fn inter_agent_communication(text: &str, trigger_turn: bool) -> RolloutItem { )) } +fn turn_started(turn_id: &str) -> RolloutItem { + RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent { + turn_id: turn_id.to_string(), + trace_id: None, + started_at: None, + model_context_window: None, + collaboration_mode_kind: Default::default(), + })) +} + +fn turn_completed(turn_id: &str) -> RolloutItem { + RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: turn_id.to_string(), + last_agent_message: None, + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + })) +} + +#[test] +fn truncates_rollout_after_terminal_canonical_turn_id() { + let rollout = vec![ + turn_started("turn-1"), + turn_completed("turn-1"), + turn_started("turn-2"), + turn_completed("turn-2"), + turn_started("turn-3"), + turn_completed("turn-3"), + ]; + + let truncated = + truncate_rollout_after_turn_id(&rollout, "turn-2").expect("truncate through turn-2"); + + assert_eq!( + serde_json::to_value(&truncated).unwrap(), + serde_json::to_value(&rollout[..4]).unwrap() + ); +} + +#[test] +fn truncate_rollout_after_turn_id_rejects_rolled_back_turn() { + let rollout = vec![ + turn_started("turn-1"), + turn_completed("turn-1"), + turn_started("turn-2"), + turn_completed("turn-2"), + RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent { + num_turns: 1, + })), + turn_started("turn-3"), + turn_completed("turn-3"), + ]; + + let err = truncate_rollout_after_turn_id(&rollout, "turn-2") + .expect_err("rolled-back turn should not be a fork anchor"); + + assert!(matches!( + err, + CodexErr::InvalidRequest(message) + if message == "lastTurnId 'turn-2' was not found in the source thread" + )); +} + +#[test] +fn truncate_rollout_after_turn_id_rejects_synthetic_legacy_turn_id() { + let rollout = vec![RolloutItem::EventMsg(EventMsg::UserMessage( + UserMessageEvent { + message: "legacy".to_string(), + ..Default::default() + }, + ))]; + + let err = truncate_rollout_after_turn_id(&rollout, "rollout-0") + .expect_err("synthetic turn should not be a fork anchor"); + + assert!(matches!( + err, + CodexErr::InvalidRequest(message) + if message + == "lastTurnId 'rollout-0' is not a persisted canonical turn in the source thread" + )); +} + +#[test] +fn truncate_rollout_after_turn_id_rejects_in_progress_turn() { + let rollout = vec![turn_started("turn-1")]; + + let err = truncate_rollout_after_turn_id(&rollout, "turn-1") + .expect_err("in-progress turn should not be a fork anchor"); + + assert!(matches!( + err, + CodexErr::InvalidRequest(message) + if message == "lastTurnId 'turn-1' identifies an in-progress turn" + )); +} + #[test] fn truncates_rollout_from_start_before_nth_user_only() { let items = [