feat(app-server): add optional turn_id to thread/fork (#30277)

## Description

This adds stable optional `turnId` support to `thread/fork`. When
supplied, the fork copies persisted history through that terminal turn,
inclusive, and drops later turns from the new thread.

Omitting or passing `null` preserves the existing full-history fork
behavior, including the interruption marker when the stored source
history ends mid-turn.

## Why

We're deprecating `thread/rollback` and this will help certain UX use
cases work around it by using `thread/fork` + `turn_id` instead.
This commit is contained in:
Owen Lin
2026-06-26 12:35:54 -07:00
committed by GitHub
Unverified
parent 812cd2bb57
commit f72976a5f1
14 changed files with 352 additions and 6 deletions
+8 -1
View File
@@ -3673,6 +3673,13 @@
}
],
"description": "Optional client-supplied analytics source classification for this forked thread."
},
"lastTurnId": {
"description": "Optional last turn id to fork through, inclusive.\n\nWhen specified, turns after `last_turn_id` are omitted from the fork. The referenced turn cannot be in progress.",
"type": [
"string",
"null"
]
}
},
"required": [
@@ -6866,4 +6873,4 @@
}
],
"title": "ClientRequest"
}
}
@@ -17287,6 +17287,13 @@
}
],
"description": "Optional client-supplied analytics source classification for this forked thread."
},
"lastTurnId": {
"description": "Optional last turn id to fork through, inclusive.\n\nWhen specified, turns after `last_turn_id` are omitted from the fork. The referenced turn cannot be in progress.",
"type": [
"string",
"null"
]
}
},
"required": [
@@ -20833,4 +20840,4 @@
},
"title": "CodexAppServerProtocol",
"type": "object"
}
}
@@ -15066,6 +15066,13 @@
}
],
"description": "Optional client-supplied analytics source classification for this forked thread."
},
"lastTurnId": {
"description": "Optional last turn id to fork through, inclusive.\n\nWhen specified, turns after `last_turn_id` are omitted from the fork. The referenced turn cannot be in progress.",
"type": [
"string",
"null"
]
}
},
"required": [
@@ -18611,4 +18618,4 @@
},
"title": "CodexAppServerProtocolV2",
"type": "object"
}
}
@@ -168,6 +168,13 @@
}
],
"description": "Optional client-supplied analytics source classification for this forked thread."
},
"lastTurnId": {
"description": "Optional last turn id to fork through, inclusive.\n\nWhen specified, turns after `last_turn_id` are omitted from the fork. The referenced turn cannot be in progress.",
"type": [
"string",
"null"
]
}
},
"required": [
@@ -175,4 +182,4 @@
],
"title": "ThreadForkParams",
"type": "object"
}
}
@@ -18,6 +18,12 @@ import type { ThreadSource } from "./ThreadSource";
* Prefer using thread_id whenever possible.
*/
export type ThreadForkParams = {threadId: string, /**
* Optional last turn id to fork through, inclusive.
*
* When specified, turns after `last_turn_id` are omitted from the fork.
* The referenced turn cannot be in progress.
*/
lastTurnId?: string | null, /**
* Configuration overrides for the forked thread, if any.
*/
model?: string | null, modelProvider?: string | null, serviceTier?: string | null | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, /**
@@ -839,6 +839,30 @@ fn thread_path_params_deserialize_empty_path_as_none() {
);
}
#[test]
fn thread_fork_last_turn_id_round_trips() {
let params: ThreadForkParams = serde_json::from_value(json!({
"threadId": "thread-1",
"lastTurnId": "turn-2",
}))
.expect("thread/fork params deserialize");
assert_eq!(params.last_turn_id, Some("turn-2".to_string()));
let serialized = serde_json::to_value(params).expect("thread/fork params serialize");
assert_eq!(serialized["lastTurnId"], json!("turn-2"));
let omitted = serde_json::to_value(ThreadForkParams {
thread_id: "thread-1".to_string(),
..Default::default()
})
.expect("thread/fork params without last turn id serialize");
assert_eq!(
omitted["lastTurnId"],
serde_json::Value::Null,
"optional lastTurnId should serialize as null when omitted"
);
}
#[test]
fn fs_get_metadata_response_round_trips_minimal_fields() {
let response = FsGetMetadataResponse {
@@ -494,6 +494,13 @@ impl From<ThreadTurnsListResponse> for TurnsPage {
pub struct ThreadForkParams {
pub thread_id: String,
/// Optional last turn id to fork through, inclusive.
///
/// When specified, turns after `last_turn_id` are omitted from the fork.
/// The referenced turn cannot be in progress.
#[ts(optional = nullable)]
pub last_turn_id: Option<String>,
/// [UNSTABLE] Specify the rollout path to fork from.
/// If specified, the thread_id param will be ignored.
#[experimental("thread/fork.path")]
+1 -1
View File
@@ -139,7 +139,7 @@ Example with notification opt-out:
- `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread. When the request includes a `cwd` and the resolved sandbox is `workspace-write` or full access, app-server also marks that project as trusted in the user `config.toml`. Pass `sessionStartSource: "clear"` when starting a replacement thread after clearing the current session so `SessionStart` hooks receive `source: "clear"` instead of the default `"startup"`. Experimental `allowProviderModelFallback` lets providers backed by an authoritative static model catalog replace an unavailable requested `model` with the catalog default; dynamic or cached catalogs preserve the requested model. Experimental `runtimeWorkspaceRoots` replaces the thread-scoped runtime workspace roots used to materialize `:workspace_roots`; paths must be absolute. For permissions, prefer experimental `permissions` profile selection by id; the legacy `sandbox` shorthand is still accepted but cannot be combined with `permissions`. Deprecated experimental `multiAgentMode` is ignored; use Ultra reasoning effort for proactive multi-agent behavior. Experimental `environments` selects the sticky execution environments for turns on the thread; omit it to use the server default, pass `[]` to disable environments, or pass explicit environment ids with per-environment `cwd`. Experimental `selectedCapabilityRoots` selects environment-owned plugin or standalone-skill roots using environment-native absolute paths. Skills found below those roots are listed and read through the owning environment. Stdio MCP servers declared by selected plugins are started in that environment, and HTTP MCP connections use that environment's HTTP client.
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. Accepts the same permission override rules as `thread/start`.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; pass an optional `lastTurnId` to copy history only through that turn, inclusive, and drop later turns from the fork. An in-progress `lastTurnId` is rejected. If `lastTurnId` is null while the source thread is mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Experimental clients can pass `excludeTurns: true` when they plan to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`.
- `thread/start`, `thread/resume`, and `thread/fork` responses include the legacy `sandbox` compatibility projection. `instructionSources` lists loaded instruction files using each source environment's native absolute path syntax, including files loaded from remote environments. Experimental clients can read `runtimeWorkspaceRoots` for the thread-scoped runtime roots and `activePermissionProfile` for the named or implicit built-in profile identity/provenance when known. Their deprecated experimental `multiAgentMode` field, and the corresponding thread setting, always report `explicitRequestOnly`; Ultra reasoning effort is the source of proactive multi-agent behavior.
- `thread/list` — page through stored threads; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Experimental clients can use `parentThreadId` for direct spawned children or `ancestorThreadId` for spawned descendants at any depth; the two filters are mutually exclusive. Review and Guardian threads are not included because they do not participate in that spawn-edge lifecycle. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. Subagent threads also include `parentThreadId` when the immediate parent is known.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
@@ -330,6 +330,7 @@ use codex_core::path_utils;
#[cfg(test)]
use codex_core::read_head_for_summary;
use codex_core::sandboxing::SandboxPermissions;
use codex_core::truncate_rollout_after_turn_id;
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
use codex_core::windows_sandbox::WindowsSandboxSetupMode as CoreWindowsSandboxSetupMode;
use codex_core::windows_sandbox::WindowsSandboxSetupRequest;
@@ -3387,6 +3387,7 @@ impl ThreadRequestProcessor {
) -> Result<(), JSONRPCErrorError> {
let ThreadForkParams {
thread_id,
last_turn_id,
path,
model,
model_provider,
@@ -3421,12 +3422,20 @@ impl ThreadRequestProcessor {
let history_items = source_thread
.history
.take()
.map(|history| Arc::new(history.items))
.map(|history| history.items)
.ok_or_else(|| {
internal_error(format!(
"thread {source_thread_id} did not include persisted history"
))
})?;
let history_items = if let Some(last_turn_id) = last_turn_id.as_deref() {
Arc::new(
truncate_rollout_after_turn_id(&history_items, last_turn_id)
.map_err(|err| core_thread_write_error("truncate thread for fork", err))?,
)
} else {
Arc::new(history_items)
};
let history_cwd = Some(source_thread.cwd.clone());
// Persist Windows sandbox mode.
@@ -251,6 +251,119 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_fork_at_last_turn_id_keeps_only_terminal_prefix() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse {
thread: source_thread,
..
} = to_response::<ThreadStartResponse>(start_resp)?;
let source_thread_id = source_thread.id.clone();
let source_path = source_thread.path.expect("source thread path");
let mut turn_ids = Vec::new();
for text in ["first", "second", "third"] {
let turn_request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: source_thread_id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_request_id)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
turn_ids.push(turn.id);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
}
let original_contents = std::fs::read_to_string(source_path.as_path())?;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: source_thread_id.clone(),
last_turn_id: Some(turn_ids[1].clone()),
..Default::default()
})
.await?;
let fork_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
)
.await??;
let ThreadForkResponse {
thread: forked_thread,
..
} = to_response::<ThreadForkResponse>(fork_resp)?;
assert_eq!(
forked_thread
.turns
.iter()
.map(|turn| turn.id.clone())
.collect::<Vec<_>>(),
turn_ids[..2]
);
assert!(
forked_thread
.turns
.iter()
.all(|turn| turn.status == TurnStatus::Completed)
);
assert_eq!(forked_thread.forked_from_id, Some(source_thread_id));
assert_eq!(forked_thread.preview, "first");
assert_eq!(
std::fs::read_to_string(source_path.as_path())?,
original_contents,
"forking at a turn must not mutate the source rollout"
);
let forked_path = forked_thread.path.clone().expect("forked thread path");
let forked_contents = std::fs::read_to_string(forked_path.as_path())?;
assert!(forked_contents.contains(turn_ids[1].as_str()));
assert!(!forked_contents.contains(turn_ids[2].as_str()));
let started = loop {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/started"),
)
.await??;
let started: ThreadStartedNotification =
serde_json::from_value(notification.params.expect("params must be present"))?;
if started.thread.id == forked_thread.id {
break started;
}
};
assert!(started.thread.turns.is_empty());
Ok(())
}
#[tokio::test]
async fn thread_fork_inherits_explicit_source_name_from_session_index() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
+1
View File
@@ -145,6 +145,7 @@ pub(crate) mod state_db_bridge;
pub use state_db_bridge::StateDbHandle;
pub use state_db_bridge::init_state_db;
mod thread_rollout_truncation;
pub use thread_rollout_truncation::truncate_rollout_after_turn_id;
mod tools;
pub(crate) mod turn_diff_tracker;
mod turn_metadata;
@@ -5,6 +5,10 @@
use crate::context_manager::is_user_turn_boundary;
use crate::event_mapping;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::build_turns_from_rollout_items;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
@@ -149,6 +153,58 @@ pub(crate) fn truncate_rollout_before_nth_user_message_from_start(
items[..cut_idx].to_vec()
}
/// Return a rollout prefix ending after the requested persisted terminal turn.
///
/// The turn must still be present in the effective post-rollback history and
/// must have an explicit persisted TurnStarted boundary. Synthetic IDs
/// generated while projecting legacy rollouts are intentionally unsupported
/// because they do not provide a stable raw rollout boundary for a fork.
pub fn truncate_rollout_after_turn_id(
items: &[RolloutItem],
last_turn_id: &str,
) -> CodexResult<Vec<RolloutItem>> {
let turns = build_turns_from_rollout_items(items);
let turn = turns
.iter()
.find(|turn| turn.id == last_turn_id)
.ok_or_else(|| {
CodexErr::InvalidRequest(format!(
"lastTurnId '{last_turn_id}' was not found in the source thread"
))
})?;
let target_start_index = items
.iter()
.position(|item| {
matches!(
item,
RolloutItem::EventMsg(EventMsg::TurnStarted(event))
if event.turn_id == last_turn_id
)
})
.ok_or_else(|| {
CodexErr::InvalidRequest(format!(
"lastTurnId '{last_turn_id}' is not a persisted canonical turn in the source thread"
))
})?;
if matches!(turn.status, TurnStatus::InProgress) {
return Err(CodexErr::InvalidRequest(format!(
"lastTurnId '{last_turn_id}' identifies an in-progress turn"
)));
}
let cut_index = items
.iter()
.enumerate()
.skip(target_start_index.saturating_add(1))
.find_map(|(index, item)| {
matches!(item, RolloutItem::EventMsg(EventMsg::TurnStarted(_))).then_some(index)
})
.unwrap_or(items.len());
Ok(items[..cut_index].to_vec())
}
/// Return a suffix of `items` that keeps the last `n_from_end` fork turns.
///
/// If fewer than or equal to `n_from_end` fork turns exist, this keeps from the first fork-turn
@@ -6,6 +6,9 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use pretty_assertions::assert_eq;
use std::sync::Arc;
@@ -66,6 +69,104 @@ fn inter_agent_communication(text: &str, trigger_turn: bool) -> RolloutItem {
))
}
fn turn_started(turn_id: &str) -> RolloutItem {
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
trace_id: None,
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}))
}
fn turn_completed(turn_id: &str) -> RolloutItem {
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_id.to_string(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
time_to_first_token_ms: None,
}))
}
#[test]
fn truncates_rollout_after_terminal_canonical_turn_id() {
let rollout = vec![
turn_started("turn-1"),
turn_completed("turn-1"),
turn_started("turn-2"),
turn_completed("turn-2"),
turn_started("turn-3"),
turn_completed("turn-3"),
];
let truncated =
truncate_rollout_after_turn_id(&rollout, "turn-2").expect("truncate through turn-2");
assert_eq!(
serde_json::to_value(&truncated).unwrap(),
serde_json::to_value(&rollout[..4]).unwrap()
);
}
#[test]
fn truncate_rollout_after_turn_id_rejects_rolled_back_turn() {
let rollout = vec![
turn_started("turn-1"),
turn_completed("turn-1"),
turn_started("turn-2"),
turn_completed("turn-2"),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 1,
})),
turn_started("turn-3"),
turn_completed("turn-3"),
];
let err = truncate_rollout_after_turn_id(&rollout, "turn-2")
.expect_err("rolled-back turn should not be a fork anchor");
assert!(matches!(
err,
CodexErr::InvalidRequest(message)
if message == "lastTurnId 'turn-2' was not found in the source thread"
));
}
#[test]
fn truncate_rollout_after_turn_id_rejects_synthetic_legacy_turn_id() {
let rollout = vec![RolloutItem::EventMsg(EventMsg::UserMessage(
UserMessageEvent {
message: "legacy".to_string(),
..Default::default()
},
))];
let err = truncate_rollout_after_turn_id(&rollout, "rollout-0")
.expect_err("synthetic turn should not be a fork anchor");
assert!(matches!(
err,
CodexErr::InvalidRequest(message)
if message
== "lastTurnId 'rollout-0' is not a persisted canonical turn in the source thread"
));
}
#[test]
fn truncate_rollout_after_turn_id_rejects_in_progress_turn() {
let rollout = vec![turn_started("turn-1")];
let err = truncate_rollout_after_turn_id(&rollout, "turn-1")
.expect_err("in-progress turn should not be a fork anchor");
assert!(matches!(
err,
CodexErr::InvalidRequest(message)
if message == "lastTurnId 'turn-1' identifies an in-progress turn"
));
}
#[test]
fn truncates_rollout_from_start_before_nth_user_only() {
let items = [