[codex] Add user input client ids (#24653)

## Summary

Adds an optional `clientId` field to app-server v2 `UserInput` and
carries it through the core `UserInput` model so clients can correlate
echoed user input items without relying on payload equality.

## Details

- Adds `client_id: Option<String>` to core `UserInput` variants.
- Exposes the v2 app-server field as `clientId` on the wire and in
generated TypeScript.
- Preserves the id when converting between app-server v2 and core
protocol types.
- Regenerates app-server schema fixtures.

## Validation

- `just fmt`
- `just write-app-server-schema`
- `cargo test -p codex-app-server-protocol`
- `cargo test -p codex-protocol`
- `just fix -p codex-app-server-protocol`
- `just fix -p codex-protocol`
- `git diff --check`
This commit is contained in:
Alexi Christakis
2026-05-28 14:54:39 -07:00
committed by GitHub
Unverified
parent a027135bc6
commit e92c952b2e
113 changed files with 793 additions and 97 deletions
@@ -272,6 +272,7 @@ fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest
request_id: RequestId::Integer(request_id),
params: TurnStartParams {
thread_id: thread_id.to_string(),
client_user_message_id: None,
input: vec![
UserInput::Text {
text: "hello".to_string(),
@@ -391,6 +392,7 @@ fn sample_turn_steer_request(
params: TurnSteerParams {
thread_id: thread_id.to_string(),
expected_turn_id: expected_turn_id.to_string(),
client_user_message_id: None,
input: vec![
UserInput::Text {
text: "more".to_string(),
+2
View File
@@ -89,6 +89,7 @@ fn sample_turn_start_request() -> ClientRequest {
request_id: RequestId::Integer(1),
params: TurnStartParams {
thread_id: "thread-1".to_string(),
client_user_message_id: None,
input: Vec::new(),
..Default::default()
},
@@ -101,6 +102,7 @@ fn sample_turn_steer_request() -> ClientRequest {
params: TurnSteerParams {
thread_id: "thread-1".to_string(),
expected_turn_id: "turn-1".to_string(),
client_user_message_id: None,
input: Vec::new(),
responsesapi_client_metadata: None,
additional_context: None,
@@ -3985,6 +3985,12 @@
],
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
},
"clientUserMessageId": {
"type": [
"string",
"null"
]
},
"cwd": {
"description": "Override the working directory for this turn and subsequent turns.",
"type": [
@@ -4071,6 +4077,12 @@
},
"TurnSteerParams": {
"properties": {
"clientUserMessageId": {
"type": [
"string",
"null"
]
},
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"
@@ -3561,6 +3561,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -15996,6 +15996,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/v2/UserInput"
@@ -18413,6 +18419,12 @@
],
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
},
"clientUserMessageId": {
"type": [
"string",
"null"
]
},
"cwd": {
"description": "Override the working directory for this turn and subsequent turns.",
"type": [
@@ -18540,6 +18552,12 @@
"TurnSteerParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"clientUserMessageId": {
"type": [
"string",
"null"
]
},
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"
@@ -13820,6 +13820,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -16237,6 +16243,12 @@
],
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
},
"clientUserMessageId": {
"type": [
"string",
"null"
]
},
"cwd": {
"description": "Override the working directory for this turn and subsequent turns.",
"type": [
@@ -16364,6 +16376,12 @@
"TurnSteerParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"clientUserMessageId": {
"type": [
"string",
"null"
]
},
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"
@@ -500,6 +500,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -500,6 +500,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -644,6 +644,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -1121,6 +1121,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -936,6 +936,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -936,6 +936,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -936,6 +936,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -1121,6 +1121,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -936,6 +936,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -1121,6 +1121,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -936,6 +936,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -936,6 +936,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -644,6 +644,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -516,6 +516,12 @@
],
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
},
"clientUserMessageId": {
"type": [
"string",
"null"
]
},
"cwd": {
"description": "Override the working directory for this turn and subsequent turns.",
"type": [
@@ -644,6 +644,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -644,6 +644,12 @@
"oneOf": [
{
"properties": {
"clientId": {
"type": [
"string",
"null"
]
},
"content": {
"items": {
"$ref": "#/definitions/UserInput"
@@ -218,6 +218,12 @@
}
},
"properties": {
"clientUserMessageId": {
"type": [
"string",
"null"
]
},
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"
@@ -23,7 +23,7 @@ import type { PatchApplyStatus } from "./PatchApplyStatus";
import type { UserInput } from "./UserInput";
import type { WebSearchAction } from "./WebSearchAction";
export type ThreadItem = { "type": "userMessage", id: string, content: Array<UserInput>, } | { "type": "hookPrompt", id: string, fragments: Array<HookPromptFragment>, } | { "type": "agentMessage", id: string, text: string, phase: MessagePhase | null, memoryCitation: MemoryCitation | null, } | { "type": "plan", id: string, text: string, } | { "type": "reasoning", id: string, summary: Array<string>, content: Array<string>, } | { "type": "commandExecution", id: string,
export type ThreadItem = { "type": "userMessage", id: string, clientId: string | null, content: Array<UserInput>, } | { "type": "hookPrompt", id: string, fragments: Array<HookPromptFragment>, } | { "type": "agentMessage", id: string, text: string, phase: MessagePhase | null, memoryCitation: MemoryCitation | null, } | { "type": "plan", id: string, text: string, } | { "type": "reasoning", id: string, summary: Array<string>, content: Array<string>, } | { "type": "commandExecution", id: string,
/**
* The command to be executed.
*/
@@ -10,7 +10,7 @@ import type { AskForApproval } from "./AskForApproval";
import type { SandboxPolicy } from "./SandboxPolicy";
import type { UserInput } from "./UserInput";
export type TurnStartParams = {threadId: string, input: Array<UserInput>, /**
export type TurnStartParams = {threadId: string, clientUserMessageId?: string | null, input: Array<UserInput>, /**
* Override the working directory for this turn and subsequent turns.
*/
cwd?: string | null, /**
@@ -3,7 +3,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { UserInput } from "./UserInput";
export type TurnSteerParams = {threadId: string, input: Array<UserInput>, /**
export type TurnSteerParams = {threadId: string, clientUserMessageId?: string | null, input: Array<UserInput>, /**
* Required active turn id precondition. The request fails when it does not
* match the currently active turn.
*/
@@ -278,7 +278,11 @@ impl ThreadHistoryBuilder {
.unwrap_or_else(|| self.new_turn(/*id*/ None));
let id = self.next_item_id();
let content = self.build_user_inputs(payload);
turn.items.push(ThreadItem::UserMessage { id, content });
turn.items.push(ThreadItem::UserMessage {
id,
client_id: payload.client_id.clone(),
content,
});
self.current_turn = Some(turn);
}
@@ -1246,6 +1250,7 @@ mod tests {
fn builds_multiple_turns_with_reasoning_items() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "First turn".into(),
images: Some(vec!["https://example.com/one.png".into()]),
text_elements: Vec::new(),
@@ -1264,6 +1269,7 @@ mod tests {
text: "full reasoning".into(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "Second turn".into(),
images: None,
text_elements: Vec::new(),
@@ -1292,6 +1298,7 @@ mod tests {
first.items[0],
ThreadItem::UserMessage {
id: "item-1".into(),
client_id: None,
content: vec![
UserInput::Text {
text: "First turn".into(),
@@ -1330,6 +1337,7 @@ mod tests {
second.items[0],
ThreadItem::UserMessage {
id: "item-4".into(),
client_id: None,
content: vec![UserInput::Text {
text: "Second turn".into(),
text_elements: Vec::new(),
@@ -1352,6 +1360,7 @@ mod tests {
let local_path = PathBuf::from("/tmp/local.png");
let events = vec![RolloutItem::EventMsg(EventMsg::UserMessage(
UserMessageEvent {
client_id: None,
message: "inspect these".into(),
images: Some(vec!["https://example.com/image.png".into()]),
image_details: vec![Some(ImageDetail::Original)],
@@ -1368,6 +1377,7 @@ mod tests {
turns[0].items[0],
ThreadItem::UserMessage {
id: "item-1".into(),
client_id: None,
content: vec![
UserInput::Text {
text: "inspect these".into(),
@@ -1399,6 +1409,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "hello".into(),
images: None,
text_elements: Vec::new(),
@@ -1410,6 +1421,7 @@ mod tests {
turn_id: turn_id.to_string(),
item: CoreTurnItem::UserMessage(CoreUserMessageItem {
id: "user-item-id".to_string(),
client_id: None,
content: Vec::new(),
}),
started_at_ms: 0,
@@ -1434,6 +1446,7 @@ mod tests {
turns[0].items[0],
ThreadItem::UserMessage {
id: "item-1".into(),
client_id: None,
content: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
@@ -1442,6 +1455,67 @@ mod tests {
);
}
#[test]
fn preserves_user_message_client_id_from_legacy_event() {
let turn_id = "turn-1";
let thread_id = ThreadId::new();
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
trace_id: None,
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
EventMsg::ItemStarted(ItemStartedEvent {
thread_id,
turn_id: turn_id.to_string(),
item: CoreTurnItem::UserMessage(CoreUserMessageItem {
id: "user-item-id".to_string(),
client_id: Some("client-message-1".to_string()),
content: vec![codex_protocol::user_input::UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
}),
started_at_ms: 0,
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: Some("client-message-1".to_string()),
message: "hello".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
..Default::default()
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_id.to_string(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
time_to_first_token_ms: None,
}),
];
let items = events
.into_iter()
.map(RolloutItem::EventMsg)
.collect::<Vec<_>>();
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
assert_eq!(
turns[0].items,
vec![ThreadItem::UserMessage {
id: "item-1".into(),
client_id: Some("client-message-1".to_string()),
content: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
}]
);
}
#[test]
fn preserves_agent_message_phase_in_history() {
let events = vec![EventMsg::AgentMessage(AgentMessageEvent {
@@ -1478,6 +1552,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
})),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "generate an image".into(),
images: None,
text_elements: Vec::new(),
@@ -1515,6 +1590,7 @@ mod tests {
items: vec![
ThreadItem::UserMessage {
id: "item-1".into(),
client_id: None,
content: vec![UserInput::Text {
text: "generate an image".into(),
text_elements: Vec::new(),
@@ -1536,6 +1612,7 @@ mod tests {
fn splits_reasoning_when_interleaved() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "Turn start".into(),
images: None,
text_elements: Vec::new(),
@@ -1589,6 +1666,7 @@ mod tests {
fn marks_turn_as_interrupted_when_aborted() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "Please do the thing".into(),
images: None,
text_elements: Vec::new(),
@@ -1607,6 +1685,7 @@ mod tests {
duration_ms: None,
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "Let's try again".into(),
images: None,
text_elements: Vec::new(),
@@ -1634,6 +1713,7 @@ mod tests {
first_turn.items[0],
ThreadItem::UserMessage {
id: "item-1".into(),
client_id: None,
content: vec![UserInput::Text {
text: "Please do the thing".into(),
text_elements: Vec::new(),
@@ -1657,6 +1737,7 @@ mod tests {
second_turn.items[0],
ThreadItem::UserMessage {
id: "item-3".into(),
client_id: None,
content: vec![UserInput::Text {
text: "Let's try again".into(),
text_elements: Vec::new(),
@@ -1678,6 +1759,7 @@ mod tests {
fn drops_last_turns_on_thread_rollback() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "First".into(),
images: None,
text_elements: Vec::new(),
@@ -1690,6 +1772,7 @@ mod tests {
memory_citation: None,
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "Second".into(),
images: None,
text_elements: Vec::new(),
@@ -1703,6 +1786,7 @@ mod tests {
}),
EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 1 }),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "Third".into(),
images: None,
text_elements: Vec::new(),
@@ -1732,6 +1816,7 @@ mod tests {
vec![
ThreadItem::UserMessage {
id: "item-1".into(),
client_id: None,
content: vec![UserInput::Text {
text: "First".into(),
text_elements: Vec::new(),
@@ -1750,6 +1835,7 @@ mod tests {
vec![
ThreadItem::UserMessage {
id: "item-3".into(),
client_id: None,
content: vec![UserInput::Text {
text: "Third".into(),
text_elements: Vec::new(),
@@ -1769,6 +1855,7 @@ mod tests {
fn thread_rollback_clears_all_turns_when_num_turns_exceeds_history() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "One".into(),
images: None,
text_elements: Vec::new(),
@@ -1781,6 +1868,7 @@ mod tests {
memory_citation: None,
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "Two".into(),
images: None,
text_elements: Vec::new(),
@@ -1814,6 +1902,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "Start".into(),
images: None,
text_elements: Vec::new(),
@@ -1821,6 +1910,7 @@ mod tests {
..Default::default()
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "Steer".into(),
images: None,
text_elements: Vec::new(),
@@ -1848,6 +1938,7 @@ mod tests {
vec![
ThreadItem::UserMessage {
id: "item-1".into(),
client_id: None,
content: vec![UserInput::Text {
text: "Start".into(),
text_elements: Vec::new(),
@@ -1855,6 +1946,7 @@ mod tests {
},
ThreadItem::UserMessage {
id: "item-2".into(),
client_id: None,
content: vec![UserInput::Text {
text: "Steer".into(),
text_elements: Vec::new(),
@@ -1875,6 +1967,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "run tools".into(),
images: None,
text_elements: Vec::new(),
@@ -2054,6 +2147,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "run dynamic tool".into(),
images: None,
text_elements: Vec::new(),
@@ -2121,6 +2215,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "run tools".into(),
images: None,
text_elements: Vec::new(),
@@ -2212,6 +2307,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "review this command".into(),
images: None,
text_elements: Vec::new(),
@@ -2297,6 +2393,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "run a subcommand".into(),
images: None,
text_elements: Vec::new(),
@@ -2362,6 +2459,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "first".into(),
images: None,
text_elements: Vec::new(),
@@ -2383,6 +2481,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "second".into(),
images: None,
text_elements: Vec::new(),
@@ -2458,6 +2557,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "first".into(),
images: None,
text_elements: Vec::new(),
@@ -2479,6 +2579,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "second".into(),
images: None,
text_elements: Vec::new(),
@@ -2528,6 +2629,7 @@ mod tests {
turns[1].items[0],
ThreadItem::UserMessage {
id: "item-2".into(),
client_id: None,
content: vec![UserInput::Text {
text: "second".into(),
text_elements: Vec::new(),
@@ -2549,6 +2651,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "apply patch".into(),
images: None,
text_elements: Vec::new(),
@@ -2584,6 +2687,7 @@ mod tests {
vec![
ThreadItem::UserMessage {
id: "item-1".into(),
client_id: None,
content: vec![UserInput::Text {
text: "apply patch".into(),
text_elements: Vec::new(),
@@ -2615,6 +2719,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "apply patch".into(),
images: None,
text_elements: Vec::new(),
@@ -2652,6 +2757,7 @@ mod tests {
vec![
ThreadItem::UserMessage {
id: "item-1".into(),
client_id: None,
content: vec![UserInput::Text {
text: "apply patch".into(),
text_elements: Vec::new(),
@@ -2681,6 +2787,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "first".into(),
images: None,
text_elements: Vec::new(),
@@ -2702,6 +2809,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "second".into(),
images: None,
text_elements: Vec::new(),
@@ -2751,6 +2859,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "first".into(),
images: None,
text_elements: Vec::new(),
@@ -2772,6 +2881,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "second".into(),
images: None,
text_elements: Vec::new(),
@@ -2846,6 +2956,7 @@ mod tests {
fn reconstructs_collab_resume_end_item() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "resume agent".into(),
images: None,
text_elements: Vec::new(),
@@ -2904,6 +3015,7 @@ mod tests {
.expect("valid receiver thread id");
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "spawn agent".into(),
images: None,
text_elements: Vec::new(),
@@ -2966,6 +3078,7 @@ mod tests {
.expect("valid receiver thread id");
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "redirect".into(),
images: None,
text_elements: Vec::new(),
@@ -3030,6 +3143,7 @@ mod tests {
fn rollback_failed_error_does_not_mark_turn_failed() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "hello".into(),
images: None,
text_elements: Vec::new(),
@@ -3068,6 +3182,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "hello".into(),
images: None,
text_elements: Vec::new(),
@@ -3105,6 +3220,7 @@ mod tests {
items_view: TurnItemsView::Full,
items: vec![ThreadItem::UserMessage {
id: "item-1".into(),
client_id: None,
content: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
@@ -3125,6 +3241,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "hello".into(),
images: None,
text_elements: Vec::new(),
@@ -3184,6 +3301,7 @@ mod tests {
collaboration_mode_kind: Default::default(),
})),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "hello".into(),
images: None,
text_elements: Vec::new(),
@@ -212,7 +212,11 @@ impl CommandAction {
pub enum ThreadItem {
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
UserMessage { id: String, content: Vec<UserInput> },
UserMessage {
id: String,
client_id: Option<String>,
content: Vec<UserInput>,
},
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
HookPrompt {
@@ -776,6 +780,7 @@ impl From<CoreTurnItem> for ThreadItem {
match value {
CoreTurnItem::UserMessage(user) => ThreadItem::UserMessage {
id: user.id,
client_id: user.client_id,
content: user.content.into_iter().map(UserInput::from).collect(),
},
CoreTurnItem::HookPrompt(hook_prompt) => ThreadItem::HookPrompt {
@@ -1756,6 +1756,7 @@ fn client_request_turn_start_granular_approval_policy_is_marked_experimental() {
request_id: crate::RequestId::Integer(4),
params: TurnStartParams {
thread_id: "thr_123".to_string(),
client_user_message_id: None,
input: Vec::new(),
approval_policy: Some(AskForApproval::Granular {
sandbox_approval: false,
@@ -2314,6 +2315,7 @@ fn network_requirements_serializes_canonical_and_legacy_fields() {
fn core_turn_item_into_thread_item_converts_supported_variants() {
let user_item = TurnItem::UserMessage(UserMessageItem {
id: "user-1".to_string(),
client_id: Some("client-message-1".to_string()),
content: vec![
CoreUserInput::Text {
text: "hello".to_string(),
@@ -2342,6 +2344,7 @@ fn core_turn_item_into_thread_item_converts_supported_variants() {
ThreadItem::from(user_item),
ThreadItem::UserMessage {
id: "user-1".to_string(),
client_id: Some("client-message-1".to_string()),
content: vec![
UserInput::Text {
text: "hello".to_string(),
@@ -3582,6 +3585,7 @@ fn turn_start_params_preserve_explicit_null_service_tier() {
let without_override = TurnStartParams {
thread_id: "thread_123".to_string(),
client_user_message_id: None,
input: vec![],
responsesapi_client_metadata: None,
additional_context: None,
@@ -65,6 +65,8 @@ pub struct AdditionalContextEntry {
#[ts(export_to = "v2/")]
pub struct TurnStartParams {
pub thread_id: String,
#[ts(optional = nullable)]
pub client_user_message_id: Option<String>,
pub input: Vec<UserInput>,
/// Optional turn-scoped Responses API client metadata.
#[experimental("turn/start.responsesapiClientMetadata")]
@@ -157,6 +159,8 @@ pub struct TurnStartResponse {
#[ts(export_to = "v2/")]
pub struct TurnSteerParams {
pub thread_id: String,
#[ts(optional = nullable)]
pub client_user_message_id: Option<String>,
pub input: Vec<UserInput>,
/// Optional turn-scoped Responses API client metadata.
#[experimental("turn/steer.responsesapiClientMetadata")]
@@ -734,6 +734,7 @@ async fn trigger_zsh_fork_multi_cmd_approval(
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: message,
text_elements: Vec::new(),
@@ -818,6 +819,7 @@ async fn resume_message_v2(
let turn_response = client.turn_start(TurnStartParams {
thread_id: resume_response.thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: user_message,
text_elements: Vec::new(),
@@ -959,6 +961,7 @@ async fn send_message_v2_with_policies(
println!("< thread/start response: {thread_response:?}");
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: user_message,
// Test client sends plain text without UI element ranges.
@@ -999,6 +1002,7 @@ async fn send_follow_up_v2(
let first_turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: first_message,
// Test client sends plain text without UI element ranges.
@@ -1012,6 +1016,7 @@ async fn send_follow_up_v2(
let follow_up_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: follow_up_message,
// Test client sends plain text without UI element ranges.
@@ -1255,6 +1260,7 @@ fn live_elicitation_timeout_pause(
let started_at = Instant::now();
let turn_response = client.turn_start(TurnStartParams {
thread_id: thread_id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: prompt,
text_elements: Vec::new(),
+5 -3
View File
@@ -158,9 +158,9 @@ Example with notification opt-out:
- `thread/shellCommand` — run a user-initiated `!` shell command against a thread; this runs unsandboxed with full access rather than inheriting the thread sandbox policy. Returns `{}` immediately while progress streams through standard turn/item notifications and any active turn receives the formatted output in its message stream.
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. Experimental `runtimeWorkspaceRoots` replaces the thread-scoped runtime workspace roots used to materialize `:workspace_roots`; relative paths resolve against the effective turn cwd. Prefer experimental `permissions` profile selection by id for permission overrides; the legacy `sandboxPolicy` field is still accepted but cannot be combined with `permissions`. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. `clientUserMessageId` is optional; when supplied, the corresponding `userMessage` item echoes it as `clientId`. Experimental `runtimeWorkspaceRoots` replaces the thread-scoped runtime workspace roots used to materialize `:workspace_roots`; relative paths resolve against the effective turn cwd. Prefer experimental `permissions` profile selection by id for permission overrides; the legacy `sandboxPolicy` field is still accepted but cannot be combined with `permissions`. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `thread/inject_items` — append raw Responses API items to a loaded threads model-visible history without starting a user turn; returns `{}` on success.
- `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. Review and manual compaction turns reject `turn/steer`.
- `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. `clientUserMessageId` is optional; when supplied, the corresponding `userMessage` item echoes it as `clientId`. Review and manual compaction turns reject `turn/steer`.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `thread/realtime/start` — start a thread-scoped realtime session (experimental); pass `outputModality: "text"` or `outputModality: "audio"` to choose model output, returns `{}` and streams `thread/realtime/*` notifications. Omit `transport` for the websocket transport, or pass `{ "type": "webrtc", "sdp": "..." }` to create a WebRTC session from a browser-generated SDP offer; the remote answer SDP is emitted as `thread/realtime/sdp`.
- `thread/realtime/appendAudio` — append an input audio chunk to the active realtime session (experimental); returns `{}`.
@@ -662,6 +662,7 @@ You can optionally specify config overrides on the new turn. If specified, these
```json
{ "method": "turn/start", "id": 30, "params": {
"threadId": "thr_123",
"clientUserMessageId": "client_msg_123",
"input": [ { "type": "text", "text": "Run tests" } ],
// Below are optional config overrides
"cwd": "/Users/me/project",
@@ -860,6 +861,7 @@ not emit `turn/started` and does not accept thread settings overrides.
```json
{ "method": "turn/steer", "id": 32, "params": {
"threadId": "thr_123",
"clientUserMessageId": "client_msg_124",
"input": [ { "type": "text", "text": "Actually focus on failing tests first." } ],
"expectedTurnId": "turn_456"
} }
@@ -1245,7 +1247,7 @@ Today both notifications carry an empty `items` array even when item events were
`ThreadItem` is the tagged union carried in turn responses and `item/*` notifications. Currently we support events for the following items:
- `userMessage``{id, content}` where `content` is a list of user inputs (`text`, `image`, or `localImage`).
- `userMessage``{id, clientId, content}` where `clientId` is the optional `clientUserMessageId` supplied to `turn/start` or `turn/steer`, and `content` is a list of user inputs (`text`, `image`, or `localImage`).
- `agentMessage``{id, text}` containing the accumulated agent reply.
- `plan``{id, text}` emitted for plan-mode turns; plan text can stream via `item/plan/delta` (experimental).
- `reasoning``{id, summary, content}` where `summary` holds streamed reasoning summaries (applicable for most OpenAI models) and `content` holds raw reasoning blocks (applicable for e.g. open source models).
@@ -2153,6 +2153,7 @@ mod tests {
let created_at = Utc::now();
let history_items = vec![
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "before rollback".to_string(),
images: None,
local_images: Vec::new(),
@@ -3228,6 +3229,7 @@ mod tests {
state.track_current_turn_event(
"turn-1",
&EventMsg::UserMessage(codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "already tracked".to_string(),
images: None,
local_images: Vec::new(),
@@ -653,6 +653,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
params: TurnStartParams {
environments: None,
thread_id,
client_user_message_id: None,
input: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
@@ -213,6 +213,7 @@ mod thread_processor_behavior_tests {
fn thread_turns_list_merges_in_progress_active_turn_before_agent_status_running() {
let persisted_items = vec![RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "persisted".to_string(),
images: None,
local_images: Vec::new(),
@@ -224,6 +225,7 @@ mod thread_processor_behavior_tests {
id: "live-turn".to_string(),
items: vec![ThreadItem::UserMessage {
id: "live-user-message".to_string(),
client_id: None,
content: vec![V2UserInput::Text {
text: "live".to_string(),
text_elements: Vec::new(),
@@ -148,6 +148,7 @@ mod tests {
fn token_usage_history() -> Vec<RolloutItem> {
vec![
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "first turn".to_string(),
images: None,
local_images: Vec::new(),
@@ -164,6 +165,7 @@ mod tests {
rate_limits: None,
})),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "second turn".to_string(),
images: None,
local_images: Vec::new(),
@@ -416,6 +416,7 @@ impl TurnRequestProcessor {
.into_iter()
.map(V2UserInput::into_core)
.collect();
let client_user_message_id = params.client_user_message_id;
let additional_context = map_additional_context(params.additional_context);
let turn_has_input = !mapped_items.is_empty();
let thread_settings = self
@@ -448,8 +449,12 @@ impl TurnRequestProcessor {
additional_context,
thread_settings,
};
let turn_id = self
.submit_core_op(&request_id, thread.as_ref(), turn_op)
let turn_id = thread
.submit_user_input_with_client_user_message_id(
turn_op,
self.request_trace_context(&request_id).await,
client_user_message_id,
)
.await
.map_err(|err| {
let error = internal_error(format!("failed to start turn: {err}"));
@@ -780,6 +785,7 @@ impl TurnRequestProcessor {
mapped_items,
additional_context,
Some(&params.expected_turn_id),
params.client_user_message_id,
params.responsesapi_client_metadata,
)
.await
@@ -989,6 +995,7 @@ impl TurnRequestProcessor {
} else {
vec![ThreadItem::UserMessage {
id: turn_id.clone(),
client_id: None,
content: vec![V2UserInput::Text {
text: display_text.to_string(),
// Review prompt display text is synthesized; no UI element ranges to preserve.
@@ -506,6 +506,7 @@ async fn external_auth_refreshes_on_unauthorized() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(codex_app_server_protocol::TurnStartParams {
thread_id: thread.thread.id,
client_user_message_id: None,
input: vec![codex_app_server_protocol::UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -612,6 +613,7 @@ async fn external_auth_refresh_error_fails_turn() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(codex_app_server_protocol::TurnStartParams {
thread_id: thread.thread.id.clone(),
client_user_message_id: None,
input: vec![codex_app_server_protocol::UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -734,6 +736,7 @@ async fn external_auth_refresh_mismatched_workspace_fails_turn() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(codex_app_server_protocol::TurnStartParams {
thread_id: thread.thread.id.clone(),
client_user_message_id: None,
input: vec![codex_app_server_protocol::UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -849,6 +852,7 @@ async fn external_auth_refresh_invalid_access_token_fails_turn() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(codex_app_server_protocol::TurnStartParams {
thread_id: thread.thread.id.clone(),
client_user_message_id: None,
input: vec![codex_app_server_protocol::UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -110,6 +110,7 @@ async fn attestation_generate_round_trip_adds_header_to_responses_websocket_hand
let turn_request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -73,6 +73,7 @@ async fn turn_start_forwards_client_metadata_to_responses_request_v2() -> Result
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -153,6 +154,7 @@ async fn turn_start_sends_fork_lineage_in_turn_metadata_for_thread_fork_v2() ->
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Continue".to_string(),
text_elements: Vec::new(),
@@ -333,6 +335,7 @@ async fn turn_steer_updates_client_metadata_on_follow_up_responses_request_v2()
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Run sleep".to_string(),
text_elements: Vec::new(),
@@ -363,6 +366,7 @@ async fn turn_steer_updates_client_metadata_on_follow_up_responses_request_v2()
let steer_req = mcp
.send_turn_steer_request(TurnSteerParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Focus on the failure".to_string(),
text_elements: Vec::new(),
@@ -458,6 +462,7 @@ async fn turn_start_forwards_client_metadata_to_responses_websocket_request_body
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -394,6 +394,7 @@ async fn send_turn_and_wait(mcp: &mut McpProcess, thread_id: &str, text: &str) -
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.to_string(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
@@ -226,6 +226,7 @@ async fn send_turn_start_request(stream: &mut WsClient, id: i64, thread_id: &str
id,
Some(serde_json::to_value(TurnStartParams {
thread_id: thread_id.to_string(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -89,6 +89,7 @@ async fn thread_start_injects_dynamic_tools_into_model_requests() -> Result<()>
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -168,6 +169,7 @@ async fn thread_start_keeps_hidden_dynamic_tools_out_of_model_requests() -> Resu
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -347,6 +349,7 @@ async fn dynamic_tool_call_round_trip_sends_text_content_items_to_model() -> Res
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Run the tool".to_string(),
text_elements: Vec::new(),
@@ -521,6 +524,7 @@ async fn dynamic_tool_call_round_trip_sends_content_items_to_model() -> Result<(
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Run the tool".to_string(),
text_elements: Vec::new(),
@@ -391,6 +391,7 @@ async fn external_agent_config_import_creates_session_rollouts() -> Result<()> {
let request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "follow up".to_string(),
text_elements: Vec::new(),
@@ -964,6 +965,7 @@ async fn external_agent_config_import_compacts_huge_session_before_first_follow_
let request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "follow up".to_string(),
text_elements: Vec::new(),
@@ -662,6 +662,7 @@ command = "python3 {hook_script_path}"
let first_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "first turn".to_string(),
text_elements: Vec::new(),
@@ -723,6 +724,7 @@ command = "python3 {hook_script_path}"
let second_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "second turn".to_string(),
text_elements: Vec::new(),
@@ -792,6 +794,7 @@ command = "python3 {hook_script_path}"
let third_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "third turn".to_string(),
text_elements: Vec::new(),
@@ -932,6 +935,7 @@ command = "python3 {hook_script_path}"
let first_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "first turn".to_string(),
text_elements: Vec::new(),
@@ -983,6 +987,7 @@ command = "python3 {hook_script_path}"
let second_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "second turn".to_string(),
text_elements: Vec::new(),
@@ -306,6 +306,7 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -135,6 +135,7 @@ async fn mcp_server_elicitation_round_trip() -> Result<()> {
let warmup_turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Warm up connectors.".to_string(),
text_elements: Vec::new(),
@@ -167,6 +168,7 @@ async fn mcp_server_elicitation_round_trip() -> Result<()> {
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Use [$calendar](app://calendar) to run the calendar tool.".to_string(),
text_elements: Vec::new(),
@@ -461,6 +461,7 @@ url = "{mcp_server_url}/mcp"
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Call the large MCP tool".to_string(),
text_elements: Vec::new(),
@@ -59,6 +59,7 @@ async fn turn_start_accepts_output_schema_v2() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -141,6 +142,7 @@ async fn turn_start_output_schema_is_per_turn_v2() -> Result<()> {
let turn_req_1 = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -183,6 +185,7 @@ async fn turn_start_output_schema_is_per_turn_v2() -> Result<()> {
let turn_req_2 = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello again".to_string(),
text_elements: Vec::new(),
@@ -152,6 +152,7 @@ async fn start_plan_mode_turn(mcp: &mut McpProcess) -> Result<codex_app_server_p
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Plan this".to_string(),
text_elements: Vec::new(),
@@ -113,6 +113,7 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste
request_id: RequestId::Integer(2),
params: TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -49,6 +49,7 @@ async fn request_permissions_round_trip() -> Result<()> {
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "pick a directory".to_string(),
text_elements: Vec::new(),
@@ -51,6 +51,7 @@ async fn request_user_input_round_trip() -> Result<()> {
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "ask something".to_string(),
text_elements: Vec::new(),
@@ -91,6 +91,7 @@ async fn review_start_runs_review_turn_and_emits_code_review_item() -> Result<()
turn.items,
vec![ThreadItem::UserMessage {
id: turn_id.clone(),
client_id: None,
content: vec![V2UserInput::Text {
text: "commit 1234567: Tidy UI colors".to_string(),
text_elements: Vec::new(),
@@ -199,6 +200,7 @@ async fn review_start_exec_approval_item_id_matches_command_execution_item() ->
turn.items,
vec![ThreadItem::UserMessage {
id: turn_id.clone(),
client_id: None,
content: vec![V2UserInput::Text {
text: "commit 1234567: Check review approvals".to_string(),
text_elements: Vec::new(),
@@ -328,6 +330,7 @@ async fn review_start_with_detached_delivery_returns_new_thread_id() -> Result<(
turn.items,
vec![ThreadItem::UserMessage {
id: turn.id.clone(),
client_id: None,
content: vec![V2UserInput::Text {
text: "detached review".to_string(),
text_elements: Vec::new(),
@@ -465,6 +468,7 @@ async fn materialize_thread_rollout(mcp: &mut McpProcess, thread_id: &str) -> Re
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.to_string(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "materialize rollout".to_string(),
text_elements: Vec::new(),
@@ -67,6 +67,7 @@ async fn openai_model_header_mismatch_emits_model_rerouted_notification_v2() ->
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "trigger safeguard".to_string(),
text_elements: Vec::new(),
@@ -133,6 +134,7 @@ async fn cyber_policy_response_emits_typed_error_notification_v2() -> Result<()>
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "trigger cyber policy error".to_string(),
text_elements: Vec::new(),
@@ -209,6 +211,7 @@ async fn response_model_field_mismatch_emits_model_rerouted_notification_v2_when
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "trigger response model check".to_string(),
text_elements: Vec::new(),
@@ -277,6 +280,7 @@ async fn model_verification_emits_typed_notification_and_warning_v2() -> Result<
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "trigger model verification".to_string(),
text_elements: Vec::new(),
@@ -93,6 +93,7 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "materialize".to_string(),
text_elements: Vec::new(),
@@ -517,6 +518,7 @@ async fn thread_archive_clears_stale_subscriptions_before_resume() -> Result<()>
let turn_start_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "materialize".to_string(),
text_elements: Vec::new(),
@@ -594,6 +596,7 @@ async fn thread_archive_clears_stale_subscriptions_before_resume() -> Result<()>
let resumed_turn_id = secondary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![UserInput::Text {
text: "secondary turn".to_string(),
text_elements: Vec::new(),
@@ -747,6 +747,7 @@ async fn thread_fork_ephemeral_remains_pathless_and_omits_listing() -> Result<()
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: fork_thread_id,
client_user_message_id: None,
input: vec![UserInput::Text {
text: "continue".to_string(),
text_elements: Vec::new(),
@@ -92,6 +92,7 @@ async fn thread_inject_items_adds_raw_response_items_to_thread_history() -> Resu
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -170,6 +171,7 @@ async fn thread_inject_items_adds_raw_response_items_after_a_turn() -> Result<()
let first_turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "First turn".to_string(),
text_elements: Vec::new(),
@@ -215,6 +217,7 @@ async fn thread_inject_items_adds_raw_response_items_after_a_turn() -> Result<()
let second_turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Second turn".to_string(),
text_elements: Vec::new(),
@@ -227,6 +227,7 @@ async fn thread_list_reports_system_error_idle_flag_after_failed_turn() -> Resul
let seed_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
@@ -249,6 +250,7 @@ async fn thread_list_reports_system_error_idle_flag_after_failed_turn() -> Resul
let failed_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "fail turn".to_string(),
text_elements: Vec::new(),
@@ -1198,6 +1198,7 @@ async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Resul
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "fail this turn".to_string(),
text_elements: Vec::new(),
@@ -1391,6 +1392,7 @@ async fn seed_pathless_store_thread(
fn store_history_items() -> Vec<RolloutItem> {
vec![RolloutItem::EventMsg(EventMsg::UserMessage(
UserMessageEvent {
client_id: None,
message: "history from store".to_string(),
images: None,
local_images: Vec::new(),
@@ -211,6 +211,7 @@ async fn thread_resume_with_empty_path_uses_running_thread_id() -> Result<()> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "materialize rollout".to_string(),
text_elements: Vec::new(),
@@ -279,6 +280,7 @@ async fn turn_start_updates_runtime_workspace_roots_for_loaded_thread() -> Resul
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -809,6 +811,7 @@ async fn thread_resume_keeps_paused_goal_paused() -> Result<()> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "materialize this thread".to_string(),
text_elements: Vec::new(),
@@ -913,6 +916,7 @@ async fn thread_goal_set_preserves_budget_limited_same_objective() -> Result<()>
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "materialize this thread".to_string(),
text_elements: Vec::new(),
@@ -1011,6 +1015,7 @@ async fn thread_goal_set_persists_resumable_stopped_statuses() -> Result<()> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "materialize this thread".to_string(),
text_elements: Vec::new(),
@@ -1206,6 +1211,7 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "materialize this thread".to_string(),
text_elements: Vec::new(),
@@ -2074,6 +2080,7 @@ async fn thread_resume_keeps_in_flight_turn_streaming() -> Result<()> {
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
@@ -2099,6 +2106,7 @@ async fn thread_resume_keeps_in_flight_turn_streaming() -> Result<()> {
let turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "respond with docs".to_string(),
text_elements: Vec::new(),
@@ -2181,6 +2189,7 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
@@ -2204,6 +2213,7 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
let running_turn_request_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "keep running".to_string(),
text_elements: Vec::new(),
@@ -2297,6 +2307,7 @@ async fn thread_resume_rejects_mismatched_path_for_running_thread_id() -> Result
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
@@ -2320,6 +2331,7 @@ async fn thread_resume_rejects_mismatched_path_for_running_thread_id() -> Result
let running_turn_request_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "keep running".to_string(),
text_elements: Vec::new(),
@@ -2433,6 +2445,7 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
@@ -2455,6 +2468,7 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R
let running_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "keep running".to_string(),
text_elements: Vec::new(),
@@ -2562,6 +2576,7 @@ async fn thread_resume_can_skip_turns_when_thread_is_running() -> Result<()> {
let turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
@@ -2645,6 +2660,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
@@ -2667,6 +2683,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
let running_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "run command".to_string(),
text_elements: Vec::new(),
@@ -2783,6 +2800,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
@@ -2806,6 +2824,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
let running_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "apply patch".to_string(),
text_elements: Vec::new(),
@@ -2950,6 +2969,7 @@ async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Re
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: resumed_thread.id,
client_user_message_id: None,
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -3262,6 +3282,7 @@ async fn start_materialized_thread_and_restart(
let materialize_turn_id = first_mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: seed_text.to_string(),
text_elements: Vec::new(),
@@ -3351,6 +3372,7 @@ async fn thread_resume_accepts_personality_override() -> Result<()> {
let materialize_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
@@ -3391,6 +3413,7 @@ async fn thread_resume_accepts_personality_override() -> Result<()> {
let turn_id = secondary
.send_turn_start_request(TurnStartParams {
thread_id: resume.thread.id,
client_user_message_id: None,
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -57,6 +57,7 @@ async fn thread_rollback_drops_last_turns_and_persists_to_rollout() -> Result<()
let turn1_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: first_text.to_string(),
text_elements: Vec::new(),
@@ -78,6 +79,7 @@ async fn thread_rollback_drops_last_turns_and_persists_to_rollout() -> Result<()
let turn2_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Second".to_string(),
text_elements: Vec::new(),
@@ -259,6 +259,7 @@ async fn turn_start_settings_override_emits_thread_settings_updated() -> Result<
let turn_request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
@@ -275,6 +275,7 @@ async fn thread_shell_command_uses_existing_active_turn() -> Result<()> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run python".to_string(),
text_elements: Vec::new(),
@@ -48,6 +48,7 @@ async fn thread_status_changed_emits_runtime_updates() -> Result<()> {
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "collect status updates".to_string(),
text_elements: Vec::new(),
@@ -171,6 +172,7 @@ async fn thread_status_changed_can_be_opted_out() -> Result<()> {
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run once".to_string(),
text_elements: Vec::new(),
@@ -81,6 +81,7 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "materialize".to_string(),
text_elements: Vec::new(),
@@ -151,6 +151,7 @@ async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run deterministic tool".to_string(),
text_elements: Vec::new(),
@@ -260,6 +261,7 @@ async fn thread_unsubscribe_preserves_cached_status_before_idle_unload() -> Resu
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "fail this turn".to_string(),
text_elements: Vec::new(),
@@ -78,6 +78,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run sleep".to_string(),
text_elements: Vec::new(),
@@ -159,6 +160,7 @@ async fn turn_interrupt_rejects_completed_turn() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "say done".to_string(),
text_elements: Vec::new(),
@@ -253,6 +255,7 @@ async fn turn_interrupt_resolves_pending_command_approval_request() -> Result<()
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run python".to_string(),
text_elements: Vec::new(),
@@ -141,6 +141,7 @@ async fn run_local_image_turn(detail: Option<ImageDetail>) -> Result<Vec<Value>>
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::LocalImage {
path: image_path,
detail,
@@ -235,6 +236,7 @@ async fn turn_start_with_empty_input_runs_model_request() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: Vec::new(),
..Default::default()
})
@@ -335,6 +337,7 @@ async fn turn_start_additional_context_flows_to_model_input() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "inspect tab".to_string(),
text_elements: Vec::new(),
@@ -420,6 +423,7 @@ async fn turn_start_sends_originator_header() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -492,6 +496,7 @@ async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: Some("client-message-1".to_string()),
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: text_elements.clone(),
@@ -521,7 +526,10 @@ async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
.await??;
match user_message_item {
ThreadItem::UserMessage { content, .. } => {
ThreadItem::UserMessage {
client_id, content, ..
} => {
assert_eq!(client_id, Some("client-message-1".to_string()));
assert_eq!(
content,
vec![V2UserInput::Text {
@@ -595,6 +603,7 @@ async fn turn_start_emits_thread_scoped_warning_notification_for_trimmed_skills(
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -761,6 +770,7 @@ async fn thread_start_omits_empty_instruction_overrides_from_model_request() ->
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -840,6 +850,7 @@ async fn turn_start_tracks_turn_event_analytics() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Image {
url: "https://example.com/a.png".to_string(),
detail: None,
@@ -928,6 +939,7 @@ async fn turn_start_accepts_text_at_limit_with_mention_item() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![
V2UserInput::Text {
text: "x".repeat(MAX_USER_INPUT_TEXT_CHARS),
@@ -991,6 +1003,7 @@ async fn turn_start_rejects_combined_oversized_text_input() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![
V2UserInput::Text {
text: first,
@@ -1065,6 +1078,7 @@ async fn turn_start_rejects_invalid_permission_selection_before_starting_turn()
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -1137,6 +1151,7 @@ async fn turn_start_rejects_unknown_environment_before_starting_turn() -> Result
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -1210,6 +1225,7 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<(
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -1262,6 +1278,7 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<(
let turn_req2 = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Second".to_string(),
text_elements: Vec::new(),
@@ -1360,6 +1377,7 @@ async fn turn_start_accepts_collaboration_mode_override_v2() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -1449,6 +1467,7 @@ async fn turn_start_uses_thread_feature_overrides_for_request_user_input_tool_de
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -1520,6 +1539,7 @@ async fn turn_start_accepts_personality_override_v2() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -1601,6 +1621,7 @@ async fn turn_start_change_personality_mid_thread_v2() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -1625,6 +1646,7 @@ async fn turn_start_change_personality_mid_thread_v2() -> Result<()> {
let turn_req2 = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello again".to_string(),
text_elements: Vec::new(),
@@ -1727,6 +1749,7 @@ async fn turn_start_uses_migrated_pragmatic_personality_without_override_v2() ->
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -1847,6 +1870,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
let first_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run python".to_string(),
text_elements: Vec::new(),
@@ -1911,6 +1935,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
let second_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run python again".to_string(),
text_elements: Vec::new(),
@@ -1988,6 +2013,7 @@ async fn turn_start_exec_approval_decline_v2() -> Result<()> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run python".to_string(),
text_elements: Vec::new(),
@@ -2143,6 +2169,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
.send_turn_start_request(TurnStartParams {
environments: None,
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "first turn".to_string(),
text_elements: Vec::new(),
@@ -2186,6 +2213,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
.send_turn_start_request(TurnStartParams {
environments: None,
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "second turn".to_string(),
text_elements: Vec::new(),
@@ -2328,6 +2356,7 @@ stream_max_retries = 0
let first_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "select dev profile".to_string(),
text_elements: Vec::new(),
@@ -2351,6 +2380,7 @@ stream_max_retries = 0
let second_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "write in new root".to_string(),
text_elements: Vec::new(),
@@ -2483,6 +2513,7 @@ async fn run_environment_selection_case(
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: format!("run {}", case.name),
text_elements: Vec::new(),
@@ -2598,6 +2629,7 @@ async fn turn_start_file_change_approval_v2() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "apply patch".into(),
text_elements: Vec::new(),
@@ -2788,6 +2820,7 @@ async fn turn_start_does_not_stream_apply_patch_change_updates_without_feature_v
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "apply patch".into(),
text_elements: Vec::new(),
@@ -2925,6 +2958,7 @@ async fn turn_start_streams_apply_patch_change_updates_v2() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "apply patch".into(),
text_elements: Vec::new(),
@@ -3055,6 +3089,7 @@ async fn turn_start_emits_spawn_agent_item_with_model_metadata_v2() -> Result<()
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: PARENT_PROMPT.to_string(),
text_elements: Vec::new(),
@@ -3274,6 +3309,7 @@ config_file = "./custom-role.toml"
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: PARENT_PROMPT.to_string(),
text_elements: Vec::new(),
@@ -3419,6 +3455,7 @@ async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Res
let turn_1_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "apply patch 1".into(),
text_elements: Vec::new(),
@@ -3491,6 +3528,7 @@ async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Res
let turn_2_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "apply patch 2".into(),
text_elements: Vec::new(),
@@ -3589,6 +3627,7 @@ async fn turn_start_file_change_approval_decline_v2() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "apply patch".into(),
text_elements: Vec::new(),
@@ -3734,6 +3773,7 @@ async fn command_execution_notifications_include_process_id() -> Result<()> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run a command".to_string(),
text_elements: Vec::new(),
@@ -117,6 +117,7 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run echo hi".to_string(),
text_elements: Vec::new(),
@@ -235,6 +236,7 @@ async fn turn_start_shell_zsh_fork_exec_approval_decline_v2() -> Result<()> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run python".to_string(),
text_elements: Vec::new(),
@@ -367,6 +369,7 @@ async fn turn_start_shell_zsh_fork_exec_approval_cancel_v2() -> Result<()> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run python".to_string(),
text_elements: Vec::new(),
@@ -525,6 +528,7 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "remove both files".to_string(),
text_elements: Vec::new(),
@@ -12,10 +12,12 @@ use codex_app_server::INPUT_TOO_LARGE_ERROR_CODE;
use codex_app_server::INVALID_PARAMS_ERROR_CODE;
use codex_app_server_protocol::AdditionalContextEntry;
use codex_app_server_protocol::AdditionalContextKind;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
@@ -67,6 +69,7 @@ async fn turn_steer_requires_active_turn() -> Result<()> {
let steer_req = mcp
.send_turn_steer_request(TurnSteerParams {
thread_id: thread.id.clone(),
client_user_message_id: Some("client-steer-message-1".to_string()),
input: vec![V2UserInput::Text {
text: "steer".to_string(),
text_elements: Vec::new(),
@@ -152,6 +155,7 @@ async fn turn_steer_rejects_oversized_text_input() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run sleep".to_string(),
text_elements: Vec::new(),
@@ -177,6 +181,7 @@ async fn turn_steer_rejects_oversized_text_input() -> Result<()> {
let steer_req = mcp
.send_turn_steer_request(TurnSteerParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: oversized_input.clone(),
text_elements: Vec::new(),
@@ -217,10 +222,10 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 10".to_string(),
"Start-Sleep -Seconds 2".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "10".to_string()];
let shell_command = vec!["sleep".to_string(), "2".to_string()];
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
@@ -228,14 +233,16 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
let working_directory = tmp.path().join("workdir");
std::fs::create_dir(&working_directory)?;
let server =
create_mock_responses_server_sequence_unchecked(vec![create_shell_command_sse_response(
let server = create_mock_responses_server_sequence_unchecked(vec![
create_shell_command_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep",
)?])
.await;
)?,
app_test_support::create_final_assistant_message_sse_response("Done")?,
])
.await;
write_mock_responses_config_toml_with_chatgpt_base_url(
&codex_home,
&server.uri(),
@@ -262,6 +269,7 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run sleep".to_string(),
text_elements: Vec::new(),
@@ -286,6 +294,7 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
let steer_req = mcp
.send_turn_steer_request(TurnSteerParams {
thread_id: thread.id.clone(),
client_user_message_id: Some("client-steer-message-1".to_string()),
input: vec![V2UserInput::Text {
text: "steer".to_string(),
text_elements: Vec::new(),
@@ -303,6 +312,34 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
let steer: TurnSteerResponse = to_response::<TurnSteerResponse>(steer_resp)?;
assert_eq!(steer.turn_id, turn.id);
timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let notification = mcp
.read_stream_until_notification_message("item/started")
.await?;
let params = notification.params.expect("item/started params");
let item_started: ItemStartedNotification =
serde_json::from_value(params).expect("deserialize item/started notification");
let ThreadItem::UserMessage {
client_id, content, ..
} = item_started.item
else {
continue;
};
if client_id == Some("client-steer-message-1".to_string()) {
assert_eq!(
content,
vec![V2UserInput::Text {
text: "steer".to_string(),
text_elements: Vec::new(),
}]
);
return Ok::<(), anyhow::Error>(());
}
}
})
.await??;
let event =
wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_steer_event").await?;
assert_eq!(event["event_params"]["thread_id"], thread.id);
@@ -316,8 +353,11 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
serde_json::Value::Null
);
mcp.interrupt_turn_and_wait_for_aborted(thread.id, steer.turn_id, DEFAULT_READ_TIMEOUT)
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
@@ -366,6 +406,7 @@ async fn turn_steer_rejects_context_only_input_without_merging_context() -> Resu
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "run sleep".to_string(),
text_elements: Vec::new(),
@@ -396,6 +437,7 @@ async fn turn_steer_rejects_context_only_input_without_merging_context() -> Resu
let steer_req = mcp
.send_turn_steer_request(TurnSteerParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: Vec::new(),
responsesapi_client_metadata: None,
additional_context,
@@ -88,6 +88,7 @@ async fn standalone_web_search_round_trips_encrypted_output() -> Result<()> {
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "Search the web".to_string(),
text_elements: Vec::new(),
+1 -1
View File
@@ -132,7 +132,7 @@ pub fn collect_explicit_skill_mentions(
let mut blocked_plain_names: HashSet<String> = HashSet::new();
for input in inputs {
if let UserInput::Skill { name, path } = input {
if let UserInput::Skill { name, path, .. } = input {
blocked_plain_names.insert(name.clone());
let Ok(path) = AbsolutePathBuf::relative_to_current_dir(path) else {
continue;
+4 -2
View File
@@ -1296,8 +1296,10 @@ pub(crate) fn render_input_preview(initial_operation: &Op) -> String {
UserInput::LocalImage { path, .. } => {
format!("[local_image:{}]", path.display())
}
UserInput::Skill { name, path } => format!("[skill:${name}]({})", path.display()),
UserInput::Mention { name, path } => format!("[mention:${name}]({path})"),
UserInput::Skill { name, path, .. } => {
format!("[skill:${name}]({})", path.display())
}
UserInput::Mention { name, path, .. } => format!("[mention:${name}]({path})"),
_ => "[input]".to_string(),
})
.collect::<Vec<_>>()
+1
View File
@@ -218,6 +218,7 @@ pub(crate) async fn run_codex_thread_one_shot(
.send(Submission {
id: "shutdown".to_string(),
op: Op::Shutdown {},
client_user_message_id: None,
trace: None,
})
.await;
@@ -129,6 +129,7 @@ async fn forward_ops_preserves_submission_trace_context() {
let submission = Submission {
id: "sub-1".to_string(),
op: Op::Interrupt,
client_user_message_id: None,
trace: Some(codex_protocol::protocol::W3cTraceContext {
traceparent: Some(
"00-1234567890abcdef1234567890abcdef-1234567890abcdef-01".to_string(),
+13
View File
@@ -230,6 +230,17 @@ impl CodexThread {
self.codex.submit_with_trace(op, trace).await
}
pub async fn submit_user_input_with_client_user_message_id(
&self,
op: Op,
trace: Option<W3cTraceContext>,
client_user_message_id: Option<String>,
) -> CodexResult<String> {
self.codex
.submit_user_input_with_client_user_message_id(op, trace, client_user_message_id)
.await
}
/// Persist whether this thread is eligible for future memory generation.
pub async fn set_thread_memory_mode(&self, mode: ThreadMemoryMode) -> anyhow::Result<()> {
self.codex.set_thread_memory_mode(mode).await
@@ -240,6 +251,7 @@ impl CodexThread {
input: Vec<UserInput>,
additional_context: BTreeMap<String, AdditionalContextEntry>,
expected_turn_id: Option<&str>,
client_user_message_id: Option<String>,
responsesapi_client_metadata: Option<HashMap<String, String>>,
) -> Result<String, SteerInputError> {
self.codex
@@ -247,6 +259,7 @@ impl CodexThread {
input,
additional_context,
expected_turn_id,
client_user_message_id,
responsesapi_client_metadata,
)
.await
+8 -4
View File
@@ -500,7 +500,7 @@ pub(crate) async fn inspect_pending_input(
pending_input_item: &TurnInput,
) -> HookRuntimeOutcome {
match pending_input_item {
TurnInput::UserInput(content) => {
TurnInput::UserInput { content, .. } => {
let request = UserPromptSubmitRequest {
session_id: sess.session_id().into(),
turn_id: turn_context.sub_id.clone(),
@@ -536,9 +536,13 @@ pub(crate) async fn record_pending_input(
additional_contexts: Vec<String>,
) {
match pending_input {
TurnInput::UserInput(content) => {
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), content.as_slice())
.await;
TurnInput::UserInput { content, client_id } => {
sess.record_user_prompt_and_emit_turn_item(
turn_context.as_ref(),
content.as_slice(),
client_id,
)
.await;
}
TurnInput::ResponseItem(item) => {
sess.record_conversation_items(turn_context, std::slice::from_ref(&item))
@@ -68,6 +68,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R
let user_event = RolloutLine {
timestamp: TEST_TIMESTAMP.to_string(),
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "hello".to_string(),
images: None,
local_images: Vec::new(),
+15 -3
View File
@@ -85,12 +85,18 @@ pub async fn realtime_conversation_list_voices(sess: &Session, sub_id: String) {
.await;
}
pub async fn user_input_or_turn(sess: &Arc<Session>, sub_id: String, op: Op) {
pub async fn user_input_or_turn(
sess: &Arc<Session>,
sub_id: String,
op: Op,
client_user_message_id: Option<String>,
) {
user_input_or_turn_inner(
sess,
sub_id,
op,
/*mirror_user_text_to_realtime*/ Some(()),
client_user_message_id,
)
.await;
}
@@ -190,6 +196,7 @@ pub(super) async fn user_input_or_turn_inner(
sub_id: String,
op: Op,
mirror_user_text_to_realtime: Option<()>,
client_user_message_id: Option<String>,
) {
let Op::UserInput {
items,
@@ -229,6 +236,7 @@ pub(super) async fn user_input_or_turn_inner(
items.clone(),
additional_context.clone(),
/*expected_turn_id*/ None,
client_user_message_id.clone(),
responsesapi_client_metadata.clone(),
)
.await
@@ -260,7 +268,10 @@ pub(super) async fn user_input_or_turn_inner(
.map(TurnInput::ResponseItem)
.collect::<Vec<_>>();
if !items.is_empty() {
task_input.push(TurnInput::UserInput(items));
task_input.push(TurnInput::UserInput {
content: items,
client_id: client_user_message_id,
});
}
sess.spawn_task(
Arc::clone(&current_context),
@@ -773,7 +784,8 @@ pub(super) async fn submission_loop(
false
}
Op::UserInput { .. } => {
user_input_or_turn(&sess, sub.id.clone(), sub.op).await;
user_input_or_turn(&sess, sub.id.clone(), sub.op, sub.client_user_message_id)
.await;
false
}
Op::ThreadSettings { thread_settings } => {
+4 -1
View File
@@ -11,7 +11,10 @@ use tokio::sync::watch;
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum TurnInput {
UserInput(Vec<UserInput>),
UserInput {
content: Vec<UserInput>,
client_id: Option<String>,
},
ResponseItem(ResponseItem),
}
+31 -2
View File
@@ -667,6 +667,25 @@ impl Codex {
let sub = Submission {
id: id.clone(),
op,
client_user_message_id: None,
trace,
};
self.submit_with_id(sub).await?;
Ok(id)
}
pub async fn submit_user_input_with_client_user_message_id(
&self,
op: Op,
trace: Option<W3cTraceContext>,
client_user_message_id: Option<String>,
) -> CodexResult<String> {
debug_assert!(matches!(op, Op::UserInput { .. }));
let id = Uuid::now_v7().to_string();
let sub = Submission {
id: id.clone(),
op,
client_user_message_id,
trace,
};
self.submit_with_id(sub).await?;
@@ -722,6 +741,7 @@ impl Codex {
input: Vec<UserInput>,
additional_context: BTreeMap<String, AdditionalContextEntry>,
expected_turn_id: Option<&str>,
client_user_message_id: Option<String>,
responsesapi_client_metadata: Option<HashMap<String, String>>,
) -> Result<String, SteerInputError> {
self.session
@@ -729,6 +749,7 @@ impl Codex {
input,
additional_context,
expected_turn_id,
client_user_message_id,
responsesapi_client_metadata,
)
.await
@@ -1074,6 +1095,7 @@ impl Session {
thread_settings: Default::default(),
},
/*mirror_user_text_to_realtime*/ None,
/*client_user_message_id*/ None,
)
.await;
}
@@ -3056,6 +3078,7 @@ impl Session {
&self,
turn_context: &TurnContext,
input: &[UserInput],
client_id: Option<String>,
) {
// Persist the user message to history, but emit the turn item from `UserInput` so
// UI-only `text_elements` are preserved. `ResponseItem::Message` does not carry
@@ -3063,7 +3086,9 @@ impl Session {
let response_item = ResponseItem::from(ResponseInputItem::from(input.to_vec()));
self.record_conversation_items(turn_context, std::slice::from_ref(&response_item))
.await;
let turn_item = TurnItem::UserMessage(UserMessageItem::new(input));
let mut user_message_item = UserMessageItem::new(input);
user_message_item.client_id = client_id;
let turn_item = TurnItem::UserMessage(user_message_item);
self.emit_turn_item_started(turn_context, &turn_item).await;
self.emit_turn_item_completed(turn_context, turn_item).await;
self.ensure_rollout_materialized().await;
@@ -3099,6 +3124,7 @@ impl Session {
input: Vec<UserInput>,
additional_context: BTreeMap<String, AdditionalContextEntry>,
expected_turn_id: Option<&str>,
client_user_message_id: Option<String>,
responsesapi_client_metadata: Option<HashMap<String, String>>,
) -> Result<String, SteerInputError> {
let mut active = self.active_turn.lock().await;
@@ -3155,7 +3181,10 @@ impl Session {
.map(ResponseItem::from)
.map(TurnInput::ResponseItem)
.collect::<Vec<_>>();
pending_input.push(TurnInput::UserInput(input));
pending_input.push(TurnInput::UserInput {
content: input,
client_id: client_user_message_id,
});
self.input_queue
.extend_pending_input_and_accept_mailbox_delivery_for_turn_state(
active_turn.turn_state.as_ref(),
+8 -5
View File
@@ -137,11 +137,14 @@ pub(super) async fn spawn_review_thread(
};
// Seed the child task with the review prompt as the initial user message.
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: review_prompt,
// Review prompt is synthesized; no UI element ranges to preserve.
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: review_prompt,
// Review prompt is synthesized; no UI element ranges to preserve.
text_elements: Vec::new(),
}],
client_id: None,
}];
let tc = Arc::new(review_turn_context);
tc.turn_metadata_state.spawn_git_enrichment_task();
// TODO(ccunningham): Review turns currently rely on `spawn_task` for TurnComplete but do not
@@ -130,6 +130,7 @@ async fn record_initial_history_resumed_hydrates_previous_turn_settings_from_lif
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "seed".to_string(),
images: None,
local_images: Vec::new(),
@@ -198,6 +199,7 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_com
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "turn 1 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -228,6 +230,7 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_com
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "turn 2 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -300,6 +303,7 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_inc
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "turn 1 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -330,6 +334,7 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_inc
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "turn 2 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -394,6 +399,7 @@ async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metad
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "turn 1 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -424,6 +430,7 @@ async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metad
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "turn 2 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -517,6 +524,7 @@ async fn reconstruct_history_rollback_counts_inter_agent_assistant_turns() {
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "turn 1 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -608,6 +616,7 @@ async fn reconstruct_history_rollback_clears_history_and_metadata_when_exceeding
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "only user".to_string(),
images: None,
local_images: Vec::new(),
@@ -662,6 +671,7 @@ async fn record_initial_history_resumed_rollback_skips_only_user_turns() {
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "seed".to_string(),
images: None,
local_images: Vec::new(),
@@ -737,6 +747,7 @@ async fn record_initial_history_resumed_rollback_drops_incomplete_user_turn_comp
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "seed".to_string(),
images: None,
local_images: Vec::new(),
@@ -765,6 +776,7 @@ async fn record_initial_history_resumed_rollback_drops_incomplete_user_turn_comp
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "rolled back".to_string(),
images: None,
local_images: Vec::new(),
@@ -898,6 +910,7 @@ async fn reconstruct_history_legacy_compaction_without_replacement_history_clear
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "after legacy compact".to_string(),
images: None,
local_images: Vec::new(),
@@ -963,6 +976,7 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "seed".to_string(),
images: None,
local_images: Vec::new(),
@@ -1068,6 +1082,7 @@ async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_tu
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "seed".to_string(),
images: None,
local_images: Vec::new(),
@@ -1096,6 +1111,7 @@ async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_tu
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "aborted".to_string(),
images: None,
local_images: Vec::new(),
@@ -1178,6 +1194,7 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "seed".to_string(),
images: None,
local_images: Vec::new(),
@@ -1206,6 +1223,7 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "current".to_string(),
images: None,
local_images: Vec::new(),
@@ -1297,6 +1315,7 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_compaction_clea
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "seed".to_string(),
images: None,
local_images: Vec::new(),
@@ -1325,6 +1344,7 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_compaction_clea
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "incomplete".to_string(),
images: None,
local_images: Vec::new(),
@@ -1377,6 +1397,7 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_preserves_turn_
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "incomplete".to_string(),
images: None,
local_images: Vec::new(),
@@ -1452,6 +1473,7 @@ async fn record_initial_history_resumed_replaced_incomplete_compacted_turn_clear
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "seed".to_string(),
images: None,
local_images: Vec::new(),
@@ -1480,6 +1502,7 @@ async fn record_initial_history_resumed_replaced_incomplete_compacted_turn_clear
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "compacted".to_string(),
images: None,
local_images: Vec::new(),
+113 -52
View File
@@ -2490,6 +2490,7 @@ async fn record_initial_history_forked_hydrates_previous_turn_settings() {
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "forked seed".to_string(),
images: None,
local_images: Vec::new(),
@@ -2686,6 +2687,7 @@ async fn thread_rollback_recomputes_previous_turn_settings_and_reference_context
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "turn 1 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -2714,6 +2716,7 @@ async fn thread_rollback_recomputes_previous_turn_settings_and_reference_context
)),
RolloutItem::EventMsg(EventMsg::UserMessage(
codex_protocol::protocol::UserMessageEvent {
client_id: None,
message: "turn 2 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -2798,6 +2801,7 @@ async fn thread_rollback_restores_cleared_reference_context_item_after_compactio
},
)),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "turn 1 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -2844,6 +2848,7 @@ async fn thread_rollback_restores_cleared_reference_context_item_after_compactio
},
)),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "turn 2 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -2900,6 +2905,7 @@ async fn thread_rollback_persists_marker_and_replays_cumulatively() {
},
)),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "turn 1 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -2926,6 +2932,7 @@ async fn thread_rollback_persists_marker_and_replays_cumulatively() {
},
)),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "turn 2 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -2952,6 +2959,7 @@ async fn thread_rollback_persists_marker_and_replays_cumulatively() {
},
)),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "turn 3 user".to_string(),
images: None,
local_images: Vec::new(),
@@ -5410,6 +5418,7 @@ async fn submit_with_id_captures_current_span_trace_context() {
.submit_with_id(Submission {
id: "sub-1".into(),
op: Op::Interrupt,
client_user_message_id: None,
trace: None,
})
.await
@@ -5481,6 +5490,7 @@ fn submission_dispatch_span_prefers_submission_trace_context() {
submission_dispatch_span(&Submission {
id: "sub-1".into(),
op: Op::Interrupt,
client_user_message_id: None,
trace: Some(submission_trace),
})
});
@@ -5507,6 +5517,7 @@ fn submission_dispatch_span_uses_debug_for_realtime_audio() {
item_id: None,
},
}),
client_user_message_id: None,
trace: None,
});
@@ -5574,6 +5585,7 @@ async fn user_turn_updates_approvals_reviewer() {
..Default::default()
},
},
/*client_user_message_id*/ None,
)
.await;
@@ -5867,6 +5879,7 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
let dispatch_span = submission_dispatch_span(&Submission {
id: "sub-1".into(),
op: Op::Interrupt,
client_user_message_id: None,
trace: Some(submission_trace.clone()),
});
let dispatch_span_id = dispatch_span.context().span().span_context().span_id();
@@ -5877,10 +5890,13 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
async {
sess.spawn_task(
Arc::clone(&tc),
vec![TurnInput::UserInput(vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}])],
vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}],
TraceCaptureTask {
captured_trace: Arc::clone(&captured_trace),
},
@@ -6689,10 +6705,13 @@ async fn spawn_task_does_not_update_previous_turn_settings_for_non_run_turn_task
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
sess.set_previous_turn_settings(/*previous_turn_settings*/ None)
.await;
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}];
sess.spawn_task(
Arc::clone(&tc),
@@ -7775,6 +7794,7 @@ async fn record_context_updates_and_set_reference_context_item_persists_full_rei
session
.persist_rollout_items(&[RolloutItem::EventMsg(EventMsg::UserMessage(
UserMessageEvent {
client_id: None,
message: "seed rollout".to_string(),
images: None,
local_images: Vec::new(),
@@ -7981,10 +8001,13 @@ impl SessionTask for GuardianDeniedApprovalTask {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn guardian_auto_review_interrupts_after_three_consecutive_denials() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: "trigger guardian denials".to_string(),
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "trigger guardian denials".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}];
sess.spawn_task(Arc::clone(&tc), input, GuardianDeniedApprovalTask)
.await;
@@ -8012,10 +8035,13 @@ async fn guardian_auto_review_interrupts_after_three_consecutive_denials() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn guardian_helper_review_interrupts_after_three_consecutive_denials() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: "keep turn active for helper reviews".to_string(),
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "keep turn active for helper reviews".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}];
sess.spawn_task(
Arc::clone(&tc),
input,
@@ -8072,10 +8098,13 @@ async fn guardian_helper_review_interrupts_after_three_consecutive_denials() {
#[test_log::test]
async fn abort_regular_task_emits_marker_before_turn_aborted() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}];
sess.spawn_task(
Arc::clone(&tc),
input,
@@ -8110,10 +8139,13 @@ async fn abort_regular_task_emits_marker_before_turn_aborted() {
#[tokio::test]
async fn abort_gracefully_emits_marker_before_turn_aborted() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}];
sess.spawn_task(
Arc::clone(&tc),
input,
@@ -8148,10 +8180,13 @@ async fn abort_gracefully_emits_marker_before_turn_aborted() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}];
sess.spawn_task(
Arc::clone(&tc),
input,
@@ -8176,6 +8211,7 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
pending_user_input.clone(),
/*additional_context*/ Default::default(),
Some(&tc.sub_id),
/*client_user_message_id*/ None,
/*responsesapi_client_metadata*/ None,
)
.await
@@ -8235,6 +8271,7 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
assert!(matches!(
fourth.msg,
EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message,
images,
text_elements,
@@ -8354,6 +8391,7 @@ async fn steer_input_requires_active_turn() {
input,
/*additional_context*/ Default::default(),
/*expected_turn_id*/ None,
/*client_user_message_id*/ None,
/*responsesapi_client_metadata*/ None,
)
.await
@@ -8365,10 +8403,13 @@ async fn steer_input_requires_active_turn() {
#[tokio::test]
async fn steer_input_enforces_expected_turn_id() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}];
sess.spawn_task(
Arc::clone(&tc),
input,
@@ -8388,6 +8429,7 @@ async fn steer_input_enforces_expected_turn_id() {
steer_input,
/*additional_context*/ Default::default(),
Some("different-turn-id"),
/*client_user_message_id*/ None,
/*responsesapi_client_metadata*/ None,
)
.await
@@ -8411,10 +8453,13 @@ async fn steer_input_rejects_non_regular_turns() {
(TaskKind::Compact, NonSteerableTurnKind::Compact),
] {
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}];
let turn_context = sess.new_default_turn_with_sub_id("turn".to_string()).await;
sess.spawn_task(
turn_context,
@@ -8435,6 +8480,7 @@ async fn steer_input_rejects_non_regular_turns() {
steer_input,
/*additional_context*/ Default::default(),
/*expected_turn_id*/ None,
/*client_user_message_id*/ None,
/*responsesapi_client_metadata*/ None,
)
.await
@@ -8449,10 +8495,13 @@ async fn steer_input_rejects_non_regular_turns() {
#[tokio::test]
async fn steer_input_returns_active_turn_id() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}];
sess.spawn_task(
Arc::clone(&tc),
input,
@@ -8472,6 +8521,7 @@ async fn steer_input_returns_active_turn_id() {
steer_input,
/*additional_context*/ Default::default(),
Some(&tc.sub_id),
/*client_user_message_id*/ None,
/*responsesapi_client_metadata*/ None,
)
.await
@@ -9463,6 +9513,7 @@ async fn steered_input_reopens_mailbox_delivery_for_current_turn() {
}],
/*additional_context*/ Default::default(),
Some(&tc.sub_id),
/*client_user_message_id*/ None,
/*responsesapi_client_metadata*/ None,
)
.await
@@ -9471,10 +9522,13 @@ async fn steered_input_reopens_mailbox_delivery_for_current_turn() {
assert_eq!(
sess.input_queue.get_pending_input(&sess.active_turn).await,
vec![
TurnInput::UserInput(vec![UserInput::Text {
text: "follow up".to_string(),
text_elements: Vec::new(),
}]),
TurnInput::UserInput {
content: vec![UserInput::Text {
text: "follow up".to_string(),
text_elements: Vec::new(),
}],
client_id: None
},
TurnInput::ResponseItem(ResponseItem::from(communication.to_response_input_item())),
],
);
@@ -9513,6 +9567,7 @@ async fn stale_defer_mailbox_delivery_does_not_override_steered_input() {
}],
/*additional_context*/ Default::default(),
Some(&tc.sub_id),
/*client_user_message_id*/ None,
/*responsesapi_client_metadata*/ None,
)
.await
@@ -9525,10 +9580,13 @@ async fn stale_defer_mailbox_delivery_does_not_override_steered_input() {
assert_eq!(
sess.input_queue.get_pending_input(&sess.active_turn).await,
vec![
TurnInput::UserInput(vec![UserInput::Text {
text: "follow up".to_string(),
text_elements: Vec::new(),
}]),
TurnInput::UserInput {
content: vec![UserInput::Text {
text: "follow up".to_string(),
text_elements: Vec::new(),
}],
client_id: None
},
TurnInput::ResponseItem(ResponseItem::from(communication.to_response_input_item())),
],
);
@@ -9593,10 +9651,13 @@ async fn tool_calls_reopen_mailbox_delivery_for_current_turn() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let input = vec![TurnInput::UserInput(vec![UserInput::Text {
text: "start review".to_string(),
text_elements: Vec::new(),
}])];
let input = vec![TurnInput::UserInput {
content: vec![UserInput::Text {
text: "start review".to_string(),
text_elements: Vec::new(),
}],
client_id: None,
}];
sess.spawn_task(Arc::clone(&tc), input, ReviewTask::new())
.await;
+3 -3
View File
@@ -429,7 +429,7 @@ async fn run_hooks_and_record_inputs(
blocked_input = true;
record_additional_contexts(sess, turn_context, hook_outcome.additional_contexts).await;
} else {
if matches!(input_item, TurnInput::UserInput(items) if !items.is_empty()) {
if matches!(input_item, TurnInput::UserInput { content, .. } if !content.is_empty()) {
accepted_user_input = true;
}
record_pending_input(
@@ -457,7 +457,7 @@ async fn build_skills_and_plugins(
let user_input = input
.iter()
.filter_map(|item| match item {
TurnInput::UserInput(content) => Some(content.as_slice()),
TurnInput::UserInput { content, .. } => Some(content.as_slice()),
TurnInput::ResponseItem(_) => None,
})
.flatten()
@@ -610,7 +610,7 @@ async fn track_turn_resolved_config_analytics(
num_input_images: input
.iter()
.filter_map(|item| match item {
TurnInput::UserInput(content) => Some(content.as_slice()),
TurnInput::UserInput { content, .. } => Some(content.as_slice()),
TurnInput::ResponseItem(_) => None,
})
.flatten()
+1 -1
View File
@@ -72,7 +72,7 @@ impl SessionTask for ReviewTask {
let mut user_input = Vec::new();
for item in input {
match item {
TurnInput::UserInput(mut content) => user_input.append(&mut content),
TurnInput::UserInput { mut content, .. } => user_input.append(&mut content),
TurnInput::ResponseItem(_) => {}
}
}
@@ -1124,6 +1124,7 @@ fn multi_agent_v2_interrupted_marker_uses_developer_input_message() {
fn completed_legacy_event_history_is_not_mid_turn() {
let completed_history = InitialHistory::Forked(vec![
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "hello".to_string(),
images: None,
text_elements: Vec::new(),
@@ -1152,6 +1153,7 @@ fn mixed_response_and_legacy_user_event_history_is_mid_turn() {
let mixed_history = InitialHistory::Forked(vec![
RolloutItem::ResponseItem(user_msg("hello")),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "hello".to_string(),
images: None,
text_elements: Vec::new(),
@@ -152,6 +152,7 @@ async fn steer_user_input(codex: &CodexThread, text: &str) {
}],
/*additional_context*/ Default::default(),
/*expected_turn_id*/ None,
/*client_user_message_id*/ None,
/*responsesapi_client_metadata*/ None,
)
.await
@@ -84,6 +84,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R
let user_event = RolloutLine {
timestamp: TEST_TIMESTAMP.to_string(),
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "hello".to_string(),
images: None,
local_images: Vec::new(),
@@ -56,6 +56,7 @@ fn resume_history(
collaboration_mode_kind: ModeKind::Default,
})),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "seed".to_string(),
images: None,
local_images: vec![],
@@ -236,6 +236,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
RolloutLine {
timestamp: "2026-01-27T12:00:01Z".to_string(),
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
client_id: None,
message: "hello from backfill".to_string(),
images: None,
local_images: Vec::new(),
+1
View File
@@ -198,6 +198,7 @@ impl AppServerClient {
request_id: request_id.clone(),
params: TurnStartParams {
thread_id: thread_id.to_string(),
client_user_message_id: None,
input: vec![UserInput::Text {
text,
// Debug client sends plain text with no UI markup spans.
+1
View File
@@ -779,6 +779,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
request_id: request_ids.next(),
params: TurnStartParams {
thread_id: primary_thread_id_for_span.clone(),
client_user_message_id: None,
input: items.into_iter().map(Into::into).collect(),
responsesapi_client_metadata: None,
additional_context: None,
@@ -78,6 +78,7 @@ fn rollout_items_from_messages(messages: &[ConversationMessage]) -> Vec<RolloutI
items.push(RolloutItem::ResponseItem(response_item));
items.push(RolloutItem::EventMsg(EventMsg::UserMessage(
UserMessageEvent {
client_id: None,
message: message.text.clone(),
images: None,
local_images: Vec::new(),
@@ -114,6 +114,7 @@ pub async fn run_codex_tool_session(
additional_context: Default::default(),
thread_settings: Default::default(),
},
client_user_message_id: None,
trace: None,
};
@@ -539,6 +539,7 @@ impl MessageProcessor {
.submit_with_id(Submission {
id: request_id_string,
op: codex_protocol::protocol::Op::Interrupt,
client_user_message_id: None,
trace: None,
})
.await
+6
View File
@@ -56,6 +56,9 @@ pub enum TurnItem {
#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)]
pub struct UserMessageItem {
pub id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub client_id: Option<String>,
pub content: Vec<UserInput>,
}
@@ -237,6 +240,7 @@ impl UserMessageItem {
pub fn new(content: &[UserInput]) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
client_id: None,
content: content.to_vec(),
}
}
@@ -245,6 +249,7 @@ impl UserMessageItem {
// Legacy user-message events flatten only text inputs into `message` and
// rebase text element ranges onto that concatenated text.
EventMsg::UserMessage(UserMessageEvent {
client_id: self.client_id.clone(),
message: self.message(),
images: Some(self.image_urls()),
image_details: self.image_details(),
@@ -272,6 +277,7 @@ impl UserMessageItem {
if let UserInput::Text {
text,
text_elements,
..
} = input
{
// Text element ranges are relative to each text chunk; offset them so they align
+4 -2
View File
@@ -1237,7 +1237,9 @@ impl From<Vec<UserInput>> for ResponseInputItem {
.into_iter()
.flat_map(|c| match c {
UserInput::Text { text, .. } => vec![ContentItem::InputText { text }],
UserInput::Image { image_url, detail } => {
UserInput::Image {
image_url, detail, ..
} => {
image_index += 1;
let detail = detail.unwrap_or(DEFAULT_IMAGE_DETAIL);
vec![ContentItem::InputImage {
@@ -1245,7 +1247,7 @@ impl From<Vec<UserInput>> for ResponseInputItem {
detail: Some(detail),
}]
}
UserInput::LocalImage { path, detail } => {
UserInput::LocalImage { path, detail, .. } => {
image_index += 1;
let detail = detail.unwrap_or(DEFAULT_IMAGE_DETAIL);
match std::fs::read(&path) {
+9 -1
View File
@@ -129,6 +129,9 @@ pub struct Submission {
pub id: String,
/// Payload
pub op: Op,
/// Client-provided id for the user message represented by `Op::UserInput`.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub client_user_message_id: Option<String>,
/// Optional W3C trace carrier propagated across async submission handoffs.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub trace: Option<W3cTraceContext>,
@@ -2164,6 +2167,8 @@ pub struct AgentMessageEvent {
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, TS)]
pub struct UserMessageEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub client_id: Option<String>,
pub message: String,
/// Image URLs sourced from `UserInput::Image`. These are safe
/// to replay in legacy UI history events and correspond to images sent to
@@ -5088,6 +5093,7 @@ mod tests {
#[test]
fn user_message_event_serializes_empty_metadata_vectors() -> Result<()> {
let event = UserMessageEvent {
client_id: None,
message: "hello".to_string(),
images: None,
local_images: Vec::new(),
@@ -5133,7 +5139,7 @@ mod tests {
#[test]
fn user_message_item_legacy_event_preserves_image_details() {
let local_path = PathBuf::from("/tmp/local.png");
let item = UserMessageItem::new(&[
let mut item = UserMessageItem::new(&[
crate::user_input::UserInput::Image {
image_url: "https://example.com/first.png".to_string(),
detail: Some(ImageDetail::Original),
@@ -5147,6 +5153,7 @@ mod tests {
detail: Some(ImageDetail::Original),
},
]);
item.client_id = Some("client-message-1".to_string());
let EventMsg::UserMessage(event) = item.as_legacy_event() else {
panic!("expected user message event");
@@ -5159,6 +5166,7 @@ mod tests {
"https://example.com/second.png".to_string(),
])
);
assert_eq!(event.client_id, Some("client-message-1".to_string()));
assert_eq!(event.image_details, vec![Some(ImageDetail::Original)]);
assert_eq!(event.local_images, vec![local_path]);
assert_eq!(event.local_image_details, vec![Some(ImageDetail::Original)]);

Some files were not shown because too many files have changed in this diff Show More