diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index c20475235..5b1d9034f 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -831,6 +831,12 @@ client_request_definitions! { serialization: thread_id(params.thread_id), response: v2::ThreadRealtimeAppendTextResponse, }, + #[experimental("thread/realtime/appendSpeech")] + ThreadRealtimeAppendSpeech => "thread/realtime/appendSpeech" { + params: v2::ThreadRealtimeAppendSpeechParams, + serialization: thread_id(params.thread_id), + response: v2::ThreadRealtimeAppendSpeechResponse, + }, #[experimental("thread/realtime/stop")] ThreadRealtimeStop => "thread/realtime/stop" { params: v2::ThreadRealtimeStopParams, @@ -3032,6 +3038,8 @@ mod tests { request_id: RequestId::Integer(9), params: v2::ThreadRealtimeStartParams { architecture: Some(RealtimeConversationArchitecture::Avas), + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: "thr_123".to_string(), model: Some("realtime-treatment-model".to_string()), output_modality: RealtimeOutputModality::Audio, @@ -3049,6 +3057,8 @@ mod tests { "params": { "architecture": "avas", "threadId": "thr_123", + "codexResponsesAsItems": null, + "codexResponseItemPrefix": null, "model": "realtime-treatment-model", "outputModality": "audio", "prompt": "You are on a call", @@ -3069,6 +3079,8 @@ mod tests { request_id: RequestId::Integer(9), params: v2::ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: "thr_123".to_string(), model: None, output_modality: RealtimeOutputModality::Audio, @@ -3086,6 +3098,8 @@ mod tests { "params": { "architecture": null, "threadId": "thr_123", + "codexResponsesAsItems": null, + "codexResponseItemPrefix": null, "model": null, "outputModality": "audio", "realtimeSessionId": null, @@ -3101,6 +3115,8 @@ mod tests { request_id: RequestId::Integer(9), params: v2::ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: "thr_123".to_string(), model: None, output_modality: RealtimeOutputModality::Audio, @@ -3118,6 +3134,8 @@ mod tests { "params": { "architecture": null, "threadId": "thr_123", + "codexResponsesAsItems": null, + "codexResponseItemPrefix": null, "model": null, "outputModality": "audio", "prompt": null, @@ -3166,6 +3184,29 @@ mod tests { Ok(()) } + #[test] + fn serialize_thread_realtime_append_speech() -> Result<()> { + let request = ClientRequest::ThreadRealtimeAppendSpeech { + request_id: RequestId::Integer(10), + params: v2::ThreadRealtimeAppendSpeechParams { + thread_id: "thr_123".to_string(), + text: "Short voice update".to_string(), + }, + }; + assert_eq!( + json!({ + "method": "thread/realtime/appendSpeech", + "id": 10, + "params": { + "threadId": "thr_123", + "text": "Short voice update" + } + }), + serde_json::to_value(&request)?, + ); + Ok(()) + } + #[test] fn serialize_thread_status_changed_notification() -> Result<()> { let notification = @@ -3276,6 +3317,8 @@ mod tests { request_id: RequestId::Integer(1), params: v2::ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: "thr_123".to_string(), model: None, output_modality: RealtimeOutputModality::Audio, diff --git a/codex-rs/app-server-protocol/src/protocol/v2/realtime.rs b/codex-rs/app-server-protocol/src/protocol/v2/realtime.rs index bde44f9a0..5bf69285f 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/realtime.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/realtime.rs @@ -70,6 +70,12 @@ pub struct ThreadRealtimeStartParams { /// Overrides the configured realtime architecture for this session only. #[ts(optional = nullable)] pub architecture: Option, + /// Sends automatic Codex responses as realtime conversation items instead of handoff appends. + #[ts(optional = nullable)] + pub codex_responses_as_items: Option, + /// Optional prefix added to automatic Codex response items when `codexResponsesAsItems` is true. + #[ts(optional = nullable)] + pub codex_response_item_prefix: Option, /// Overrides the configured realtime model for this session only. #[ts(optional = nullable)] pub model: Option, @@ -146,6 +152,21 @@ pub struct ThreadRealtimeAppendTextParams { #[ts(export_to = "v2/")] pub struct ThreadRealtimeAppendTextResponse {} +/// EXPERIMENTAL - append speakable text to thread realtime. +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadRealtimeAppendSpeechParams { + pub thread_id: String, + pub text: String, +} + +/// EXPERIMENTAL - response for appending realtime speech. +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadRealtimeAppendSpeechResponse {} + /// EXPERIMENTAL - stop thread realtime. #[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 8ad2ba7b6..b9c70860e 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -165,9 +165,10 @@ Example with notification opt-out: - `thread/inject_items` — append raw Responses API items to a loaded thread’s model-visible history without starting a user turn; returns `{}` on success. - `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. `clientUserMessageId` is optional; when supplied, the corresponding `userMessage` item echoes it as `clientId`. Review and manual compaction turns reject `turn/steer`. - `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`. -- `thread/realtime/start` — start a thread-scoped realtime session (experimental); pass `outputModality: "text"` or `outputModality: "audio"` to choose model output, and optionally pass `model` and `version` to override configured realtime selection for this session only. Returns `{}` and streams `thread/realtime/*` notifications. Omit `transport` for the websocket transport, or pass `{ "type": "webrtc", "sdp": "..." }` to create a WebRTC session from a browser-generated SDP offer; the remote answer SDP is emitted as `thread/realtime/sdp`. +- `thread/realtime/start` — start a thread-scoped realtime session (experimental); pass `outputModality: "text"` or `outputModality: "audio"` to choose model output, and optionally pass `model` and `version` to override configured realtime selection for this session only. By default, automatic Codex text follows the protocol's speakable output path. Pass `codexResponsesAsItems: true` to send automatic Codex responses as realtime conversation items instead, and optionally pass `codexResponseItemPrefix` to prepend experiment instructions to those items. Returns `{}` and streams `thread/realtime/*` notifications. Omit `transport` for the websocket transport, or pass `{ "type": "webrtc", "sdp": "..." }` to create a WebRTC session from a browser-generated SDP offer; the remote answer SDP is emitted as `thread/realtime/sdp`. - `thread/realtime/appendAudio` — append an input audio chunk to the active realtime session (experimental); returns `{}`. - `thread/realtime/appendText` — append text input to the active realtime session with a required `role` of `user` or `developer` (experimental); returns `{}`. Older clients that omit `role` default to `user`. +- `thread/realtime/appendSpeech` — append text that the realtime model should speak to the user (experimental); returns `{}`. - `thread/realtime/stop` — stop the active realtime session for the thread (experimental); returns `{}`. - `review/start` — kick off Codex’s automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review. - `command/exec` — run a single command under the server sandbox without starting a thread/turn (handy for utilities and validation). @@ -878,6 +879,15 @@ Omit `prompt` to use Codex's default realtime backend prompt. Send `prompt: null `prompt: ""` when the session should start without that default backend prompt. Clients may also pass `model` and `version` on `thread/realtime/start` to select a different realtime session configuration without changing thread or user config. +Pass `codexResponsesAsItems: true` to inject automatic Codex responses with +`conversation.item.create` instead of the protocol's default speakable output +path. When using that mode, `codexResponseItemPrefix` can prepend short +experiment instructions to each automatic Codex response item. Omit +`codexResponsesAsItems`, or pass `false`, to preserve the default speakable +behavior. Call +`thread/realtime/appendText` to append app-provided realtime text items, or +`thread/realtime/appendSpeech` when the app decides a realtime update should be +spoken. ```javascript await pc.setRemoteDescription({ diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 12bded760..603810f24 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -1317,6 +1317,11 @@ impl MessageProcessor { .thread_realtime_append_text(&request_id, params) .await } + ClientRequest::ThreadRealtimeAppendSpeech { params, .. } => { + self.turn_processor + .thread_realtime_append_speech(&request_id, params) + .await + } ClientRequest::ThreadRealtimeStop { params, .. } => { self.turn_processor .thread_realtime_stop(&request_id, params) diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 4ffdb9ecf..640e975a0 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -225,6 +225,8 @@ use codex_app_server_protocol::ThreadReadParams; use codex_app_server_protocol::ThreadReadResponse; use codex_app_server_protocol::ThreadRealtimeAppendAudioParams; use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse; +use codex_app_server_protocol::ThreadRealtimeAppendSpeechParams; +use codex_app_server_protocol::ThreadRealtimeAppendSpeechResponse; use codex_app_server_protocol::ThreadRealtimeAppendTextParams; use codex_app_server_protocol::ThreadRealtimeAppendTextResponse; use codex_app_server_protocol::ThreadRealtimeListVoicesResponse; @@ -397,6 +399,7 @@ use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::permissions::FileSystemSandboxPolicy; use codex_protocol::protocol::AgentStatus; use codex_protocol::protocol::ConversationAudioParams; +use codex_protocol::protocol::ConversationSpeechParams; use codex_protocol::protocol::ConversationStartParams; use codex_protocol::protocol::ConversationStartTransport; use codex_protocol::protocol::ConversationTextParams; diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index ccb45c367..e6b626aef 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -182,6 +182,16 @@ impl TurnRequestProcessor { .map(|response| response.map(Into::into)) } + pub(crate) async fn thread_realtime_append_speech( + &self, + request_id: &ConnectionRequestId, + params: ThreadRealtimeAppendSpeechParams, + ) -> Result, JSONRPCErrorError> { + self.thread_realtime_append_speech_inner(request_id, params) + .await + .map(|response| response.map(Into::into)) + } + pub(crate) async fn thread_realtime_stop( &self, request_id: &ConnectionRequestId, @@ -942,6 +952,8 @@ impl TurnRequestProcessor { thread.as_ref(), Op::RealtimeConversationStart(ConversationStartParams { architecture: params.architecture, + codex_responses_as_items: params.codex_responses_as_items.unwrap_or(false), + codex_response_item_prefix: params.codex_response_item_prefix, model: params.model, output_modality: params.output_modality, prompt: params.prompt, @@ -1018,6 +1030,31 @@ impl TurnRequestProcessor { Ok(Some(ThreadRealtimeAppendTextResponse::default())) } + async fn thread_realtime_append_speech_inner( + &self, + request_id: &ConnectionRequestId, + params: ThreadRealtimeAppendSpeechParams, + ) -> Result, JSONRPCErrorError> { + let Some((_, thread)) = self + .prepare_realtime_conversation_thread(request_id, ¶ms.thread_id) + .await? + else { + return Ok(None); + }; + self.submit_core_op( + request_id, + thread.as_ref(), + Op::RealtimeConversationSpeech(ConversationSpeechParams { text: params.text }), + ) + .await + .map_err(|err| { + internal_error(format!( + "failed to append realtime conversation speech: {err}" + )) + })?; + Ok(Some(ThreadRealtimeAppendSpeechResponse::default())) + } + async fn thread_realtime_stop_inner( &self, request_id: &ConnectionRequestId, diff --git a/codex-rs/app-server/tests/common/test_app_server.rs b/codex-rs/app-server/tests/common/test_app_server.rs index 33383ad2c..e940d9a42 100644 --- a/codex-rs/app-server/tests/common/test_app_server.rs +++ b/codex-rs/app-server/tests/common/test_app_server.rs @@ -90,6 +90,7 @@ use codex_app_server_protocol::ThreadMemoryModeSetParams; use codex_app_server_protocol::ThreadMetadataUpdateParams; use codex_app_server_protocol::ThreadReadParams; use codex_app_server_protocol::ThreadRealtimeAppendAudioParams; +use codex_app_server_protocol::ThreadRealtimeAppendSpeechParams; use codex_app_server_protocol::ThreadRealtimeAppendTextParams; use codex_app_server_protocol::ThreadRealtimeListVoicesParams; use codex_app_server_protocol::ThreadRealtimeStartParams; @@ -1036,6 +1037,16 @@ impl TestAppServer { .await } + /// Send a `thread/realtime/appendSpeech` JSON-RPC request (v2). + pub async fn send_thread_realtime_append_speech_request( + &mut self, + params: ThreadRealtimeAppendSpeechParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/realtime/appendSpeech", params) + .await + } + /// Send a `thread/realtime/stop` JSON-RPC request (v2). pub async fn send_thread_realtime_stop_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/experimental_api.rs b/codex-rs/app-server/tests/suite/v2/experimental_api.rs index 9e45cf256..417fa5a39 100644 --- a/codex-rs/app-server/tests/suite/v2/experimental_api.rs +++ b/codex-rs/app-server/tests/suite/v2/experimental_api.rs @@ -80,6 +80,8 @@ async fn realtime_conversation_start_requires_experimental_api_capability() -> R let request_id = mcp .send_thread_realtime_start_request(ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: "thr_123".to_string(), model: None, output_modality: RealtimeOutputModality::Audio, @@ -189,6 +191,8 @@ async fn realtime_webrtc_start_requires_experimental_api_capability() -> Result< let request_id = mcp .send_thread_realtime_start_request(ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: "thr_123".to_string(), model: None, output_modality: RealtimeOutputModality::Audio, diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 98549ef36..6f9ed7f67 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -15,6 +15,8 @@ use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadRealtimeAppendAudioParams; use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse; +use codex_app_server_protocol::ThreadRealtimeAppendSpeechParams; +use codex_app_server_protocol::ThreadRealtimeAppendSpeechResponse; use codex_app_server_protocol::ThreadRealtimeAppendTextParams; use codex_app_server_protocol::ThreadRealtimeAppendTextResponse; use codex_app_server_protocol::ThreadRealtimeAudioChunk; @@ -82,6 +84,8 @@ const V2_STEERING_ACKNOWLEDGEMENT: &str = "This was sent to steer the previous background agent task."; const V2_HANDOFF_COMPLETE_ACKNOWLEDGEMENT: &str = "Background agent finished. Use the preceding [BACKEND] messages as the result."; +const RESPONSE_ITEM_PREFIX: &str = + "Use the following context to inform future responses, but do not speak it to the user."; #[derive(Debug, Clone, Copy)] enum StartupContextConfig<'a> { @@ -309,6 +313,28 @@ impl RealtimeE2eHarness { } async fn start_webrtc_realtime(&mut self, offer_sdp: &str) -> Result { + self.start_webrtc_realtime_with_codex_responses_as_items( + offer_sdp, /*codex_responses_as_items*/ None, + ) + .await + } + + async fn start_webrtc_realtime_with_codex_response_items( + &mut self, + offer_sdp: &str, + ) -> Result { + self.start_webrtc_realtime_with_codex_responses_as_items( + offer_sdp, + /*codex_responses_as_items*/ Some(true), + ) + .await + } + + async fn start_webrtc_realtime_with_codex_responses_as_items( + &mut self, + offer_sdp: &str, + codex_responses_as_items: Option, + ) -> Result { // Starts realtime through the public JSON-RPC method, then waits for the same client-visible // notifications a desktop app needs: started first, SDP answer second. let start_request_id = self @@ -316,6 +342,10 @@ impl RealtimeE2eHarness { .send_thread_realtime_start_request(ThreadRealtimeStartParams { architecture: None, thread_id: self.thread_id.clone(), + codex_response_item_prefix: codex_responses_as_items + .unwrap_or(false) + .then(|| RESPONSE_ITEM_PREFIX.to_string()), + codex_responses_as_items, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -407,6 +437,24 @@ impl RealtimeE2eHarness { Ok(()) } + async fn append_speech(&mut self, thread_id: String, text: &str) -> Result<()> { + let request_id = self + .mcp + .send_thread_realtime_append_speech_request(ThreadRealtimeAppendSpeechParams { + thread_id, + text: text.to_string(), + }) + .await?; + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + self.mcp + .read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let _: ThreadRealtimeAppendSpeechResponse = to_response(response)?; + Ok(()) + } + async fn main_loop_responses_requests(&self) -> Result> { responses_requests(&self.main_loop_responses_server).await } @@ -564,6 +612,8 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { let start_request_id = mcp .send_thread_realtime_start_request(ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: thread_start.thread.id.clone(), model: Some("realtime-treatment-model".to_string()), output_modality: RealtimeOutputModality::Audio, @@ -840,6 +890,8 @@ async fn realtime_text_output_modality_requests_text_output_and_final_transcript let start_request_id = mcp .send_thread_realtime_start_request(ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: thread_start.thread.id.clone(), model: None, output_modality: RealtimeOutputModality::Text, @@ -1017,6 +1069,8 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> { let start_request_id = mcp .send_thread_realtime_start_request(ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: thread_start.thread.id.clone(), model: None, output_modality: RealtimeOutputModality::Audio, @@ -1117,6 +1171,8 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> { let start_request_id = mcp .send_thread_realtime_start_request(ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: thread_id.clone(), model: None, output_modality: RealtimeOutputModality::Audio, @@ -1291,7 +1347,64 @@ async fn webrtc_v1_start_posts_offer_returns_sdp_and_joins_sideband() -> Result< } #[tokio::test] -async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()> { +async fn webrtc_v1_default_automatic_output_uses_handoff_append() -> Result<()> { + skip_if_no_network!(Ok(())); + + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V1, + main_loop_responses(vec![create_final_assistant_message_sse_response( + "legacy automatic speech", + )?]), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![session_updated("sess_v1_default_handoff")], + vec![], + vec![], + ])]), + ) + .await?; + + let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V1); + assert_v1_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?; + + let turn_request_id = harness + .mcp + .send_turn_start_request(TurnStartParams { + thread_id: harness.thread_id.clone(), + input: vec![V2UserInput::Text { + text: "say the default output".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let turn_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + harness + .mcp + .read_stream_until_response_message(RequestId::Integer(turn_request_id)), + ) + .await??; + let _: TurnStartResponse = to_response(turn_response)?; + let _ = harness + .read_notification::("turn/completed") + .await?; + + assert_eq!( + harness.sideband_outbound_request(/*request_index*/ 1).await, + json!({ + "type": "conversation.handoff.append", + "handoff_id": "codex", + "output_text": "legacy automatic speech", + }) + ); + + harness.shutdown().await; + Ok(()) +} + +#[tokio::test] +async fn webrtc_v1_handoff_request_delegates_context_and_manual_append_speaks() -> Result<()> { skip_if_no_network!(Ok(())); // Phase 1: script one v1 handoff request on the sideband and one delegated Responses turn. @@ -1323,11 +1436,14 @@ async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()> }), ], vec![], + vec![], ])]), ) .await?; - let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + let started = harness + .start_webrtc_realtime_with_codex_response_items("v=offer\r\n") + .await?; assert_eq!(started.started.version, RealtimeConversationVersion::V1); assert_call_create_multipart( harness.call_capture.single_request(), @@ -1346,8 +1462,8 @@ async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()> .await?; assert_eq!(turn_completed.thread_id, harness.thread_id); - // Phase 3: assert the delegated prompt went to Responses, then the v1 handoff append went back - // over the existing sideband connection. + // Phase 3: assert the delegated prompt went to Responses, then the automatic v1 output went + // back over the existing sideband connection as a conversation item. let requests = harness.main_loop_responses_requests().await?; assert_eq!(requests.len(), 1); assert!( @@ -1358,13 +1474,32 @@ async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()> "delegated Responses request should contain realtime delegation envelope: {}", requests[0] ); - let handoff_append = harness.sideband_outbound_request(/*request_index*/ 1).await; + let context_update = harness.sideband_outbound_request(/*request_index*/ 1).await; assert_eq!( - handoff_append, + context_update, + json!({ + "type": "conversation.item.create", + "item": { + "type": "message", + "role": "developer", + "content": [{ + "type": "input_text", + "text": format!("{RESPONSE_ITEM_PREFIX}\n\ndelegated from v1") + }] + } + }) + ); + + harness + .append_speech(harness.thread_id.clone(), "manual spoken v1 update") + .await?; + let spoken_append = harness.sideband_outbound_request(/*request_index*/ 2).await; + assert_eq!( + spoken_append, json!({ "type": "conversation.handoff.append", - "handoff_id": "handoff_v1", - "output_text": "\"Agent Final Message\":\n\ndelegated from v1", + "handoff_id": "codex", + "output_text": "manual spoken v1 update", }) ); @@ -1373,131 +1508,234 @@ async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()> } #[tokio::test] -async fn webrtc_assistant_output_without_handoff_reaches_realtime() -> Result<()> { +async fn realtime_automatic_standalone_output_is_item_and_append_speaks() -> Result<()> { + skip_if_no_network!(Ok(())); + + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V2, + main_loop_responses(vec![create_final_assistant_message_sse_response( + "automatic output", + )?]), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![session_updated("sess_manual_handoff")], + vec![], + vec![], + vec![], + ])]), + ) + .await?; + + let started = harness + .start_webrtc_realtime_with_codex_response_items("v=offer\r\n") + .await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V2); + assert_eq!( + harness.sideband_outbound_request(/*request_index*/ 0).await["type"].as_str(), + Some("session.update") + ); + + let turn_request_id = harness + .mcp + .send_turn_start_request(TurnStartParams { + thread_id: harness.thread_id.clone(), + input: vec![V2UserInput::Text { + text: "do something quietly".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let turn_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + harness + .mcp + .read_stream_until_response_message(RequestId::Integer(turn_request_id)), + ) + .await??; + let _: TurnStartResponse = to_response(turn_response)?; + let _ = harness + .read_notification::("turn/completed") + .await?; + + assert_v2_backend_item_update( + &harness.sideband_outbound_request(/*request_index*/ 1).await, + "automatic output", + ); + let automatic_response_create = timeout( + Duration::from_millis(200), + harness + .realtime_server + .wait_for_request(/*connection_index*/ 0, /*request_index*/ 2), + ) + .await; + assert!( + automatic_response_create.is_err(), + "automatic item should not request a realtime response" + ); + + harness + .append_speech(harness.thread_id.clone(), "manual voice update") + .await?; + assert_v2_progress_update( + &harness.sideband_outbound_request(/*request_index*/ 2).await, + "manual voice update", + ); + assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 3).await); + + harness.shutdown().await; + Ok(()) +} + +#[tokio::test] +async fn realtime_automatic_handoff_output_is_item_and_append_speaks() -> Result<()> { + skip_if_no_network!(Ok(())); + + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V2, + main_loop_responses(vec![create_final_assistant_message_sse_response( + "automatic final response", + )?]), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![ + session_updated("sess_manual_update"), + v2_background_agent_tool_call("call_quiet", "delegate quietly"), + ], + vec![], + vec![], + vec![], + vec![], + ])]), + ) + .await?; + + let started = harness + .start_webrtc_realtime_with_codex_response_items("v=offer\r\n") + .await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V2); + assert_eq!( + harness.sideband_outbound_request(/*request_index*/ 0).await["type"].as_str(), + Some("session.update") + ); + + let turn_started = harness + .read_notification::("turn/started") + .await?; + assert_eq!(turn_started.thread_id, harness.thread_id); + let turn_completed = harness + .read_notification::("turn/completed") + .await?; + assert_eq!(turn_completed.thread_id, harness.thread_id); + + assert_v2_backend_item_update( + &harness.sideband_outbound_request(/*request_index*/ 1).await, + "automatic final response", + ); + assert_v2_function_call_output( + &harness.sideband_outbound_request(/*request_index*/ 2).await, + "call_quiet", + "", + ); + let automatic_response_create = timeout( + Duration::from_millis(200), + harness + .realtime_server + .wait_for_request(/*connection_index*/ 0, /*request_index*/ 3), + ) + .await; + assert!( + automatic_response_create.is_err(), + "automatic handoff item should not request a realtime response" + ); + + harness + .append_speech(harness.thread_id.clone(), "manual spoken update") + .await?; + assert_v2_progress_update( + &harness.sideband_outbound_request(/*request_index*/ 3).await, + "manual spoken update", + ); + assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 4).await); + + harness.shutdown().await; + Ok(()) +} + +#[tokio::test] +async fn webrtc_v2_assistant_output_without_handoff_reaches_realtime_context() -> Result<()> { skip_if_no_network!(Ok(())); let final_answer = "long output ".repeat(1_000); - for (version, expected_version, preamble) in [ - ( - RealtimeTestVersion::V1, - RealtimeConversationVersion::V1, - "direct preamble from v1", - ), - ( - RealtimeTestVersion::V2, - RealtimeConversationVersion::V2, - "direct preamble from v2", - ), - ] { - let mut harness = RealtimeE2eHarness::new( - version, - main_loop_responses(vec![responses::sse(vec![ - responses::ev_response_created("resp-1"), - json!({ - "type": "response.output_item.done", - "item": { - "type": "message", - "role": "assistant", - "id": "msg-preamble", - "phase": "commentary", - "content": [{"type": "output_text", "text": preamble}] - } - }), - responses::ev_assistant_message("msg-final", &final_answer), - responses::ev_completed("resp-1"), - ])]), - realtime_sideband(vec![realtime_sideband_connection(vec![ - vec![session_updated("sess_standalone_output")], - vec![], - match version { - RealtimeTestVersion::V1 => vec![], - RealtimeTestVersion::V2 => vec![ - json!({ - "type": "response.created", - "response": { "id": "resp_preamble" } - }), - json!({ - "type": "response.done", - "response": { "id": "resp_preamble" } - }), - ], - }, - vec![], - vec![], - ])]), - ) + let preamble = "direct preamble from v2"; + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V2, + main_loop_responses(vec![responses::sse(vec![ + responses::ev_response_created("resp-1"), + json!({ + "type": "response.output_item.done", + "item": { + "type": "message", + "role": "assistant", + "id": "msg-preamble", + "phase": "commentary", + "content": [{"type": "output_text", "text": preamble}] + } + }), + responses::ev_assistant_message("msg-final", &final_answer), + responses::ev_completed("resp-1"), + ])]), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![session_updated("sess_standalone_output")], + vec![], + vec![], + ])]), + ) + .await?; + + let started = harness + .start_webrtc_realtime_with_codex_response_items("v=offer\r\n") + .await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V2); + + let request_id = harness + .mcp + .send_turn_start_request(TurnStartParams { + thread_id: harness.thread_id.clone(), + input: vec![V2UserInput::Text { + text: "direct text turn".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + harness + .mcp + .read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let _: TurnStartResponse = to_response(response)?; + let _ = harness + .read_notification::("turn/completed") .await?; - let started = harness.start_webrtc_realtime("v=offer\r\n").await?; - assert_eq!(started.started.version, expected_version); + assert_v2_backend_item_update( + &harness.sideband_outbound_request(/*request_index*/ 1).await, + preamble, + ); + let final_request = harness.sideband_outbound_request(/*request_index*/ 2).await; + assert_eq!(final_request["type"], "conversation.item.create"); + assert_eq!(final_request["item"]["type"], "message"); + assert_eq!(final_request["item"]["role"], "developer"); + assert_eq!(final_request["item"]["content"][0]["type"], "input_text"); + let output_text = final_request["item"]["content"][0]["text"] + .as_str() + .expect("output text"); + assert!(output_text.starts_with(&format!("{RESPONSE_ITEM_PREFIX}\n\n[BACKEND] "))); + assert!(output_text.contains("tokens truncated")); + assert!(output_text.len() <= 4_000); - let request_id = harness - .mcp - .send_turn_start_request(TurnStartParams { - thread_id: harness.thread_id.clone(), - input: vec![V2UserInput::Text { - text: "direct text turn".to_string(), - text_elements: Vec::new(), - }], - ..Default::default() - }) - .await?; - let response: JSONRPCResponse = timeout( - DEFAULT_TIMEOUT, - harness - .mcp - .read_stream_until_response_message(RequestId::Integer(request_id)), - ) - .await??; - let _: TurnStartResponse = to_response(response)?; - let _ = harness - .read_notification::("turn/completed") - .await?; - - let preamble_request = harness.sideband_outbound_request(/*request_index*/ 1).await; - let output_text = match version { - RealtimeTestVersion::V1 => { - let final_request = harness.sideband_outbound_request(/*request_index*/ 2).await; - assert_eq!( - preamble_request, - json!({ - "type": "conversation.handoff.append", - "handoff_id": "codex", - "output_text": preamble, - }) - ); - assert_eq!(final_request["type"], "conversation.handoff.append"); - assert_eq!(final_request["handoff_id"], "codex"); - final_request["output_text"] - .as_str() - .expect("output text") - .to_string() - } - RealtimeTestVersion::V2 => { - assert_v2_progress_update(&preamble_request, preamble); - assert_v2_response_create( - &harness.sideband_outbound_request(/*request_index*/ 2).await, - ); - let final_request = harness.sideband_outbound_request(/*request_index*/ 3).await; - assert_eq!(final_request["type"], "conversation.item.create"); - assert_eq!(final_request["item"]["type"], "message"); - assert_eq!(final_request["item"]["role"], "user"); - assert_eq!(final_request["item"]["content"][0]["type"], "input_text"); - let output_text = final_request["item"]["content"][0]["text"] - .as_str() - .expect("output text"); - assert!(output_text.starts_with("[BACKEND] ")); - assert_v2_response_create( - &harness.sideband_outbound_request(/*request_index*/ 4).await, - ); - output_text.to_string() - } - }; - assert!(output_text.contains("tokens truncated")); - assert!(output_text.len() <= 4_000); - - harness.shutdown().await; - } + harness.shutdown().await; Ok(()) } @@ -1807,14 +2045,6 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await; assert_v2_function_call_output(&tool_output, "call_v2", V2_HANDOFF_COMPLETE_ACKNOWLEDGEMENT); - assert_eq!( - function_call_output_sideband_requests(&harness.realtime_server).len(), - 1 - ); - - // Phase 4: after the final function-call output, realtime needs an explicit - // `response.create` to produce the next user-visible response. - assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 3).await); harness.shutdown().await; Ok(()) @@ -2036,10 +2266,6 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<( "call_shell", V2_HANDOFF_COMPLETE_ACKNOWLEDGEMENT, ); - assert_eq!( - function_call_output_sideband_requests(&harness.realtime_server).len(), - 1 - ); harness.shutdown().await; Ok(()) @@ -2165,6 +2391,8 @@ async fn realtime_webrtc_start_surfaces_backend_error() -> Result<()> { let start_request_id = mcp .send_thread_realtime_start_request(ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: thread_start.thread.id, model: None, output_modality: RealtimeOutputModality::Audio, @@ -2227,6 +2455,8 @@ async fn realtime_conversation_requires_feature_flag() -> Result<()> { let start_request_id = mcp .send_thread_realtime_start_request(ThreadRealtimeStartParams { architecture: None, + codex_responses_as_items: None, + codex_response_item_prefix: None, thread_id: thread_start.thread.id.clone(), model: None, output_modality: RealtimeOutputModality::Audio, @@ -2350,18 +2580,6 @@ fn realtime_tool_ok_command() -> Vec { } } -fn function_call_output_sideband_requests(server: &WebSocketTestServer) -> Vec { - server - .single_connection() - .iter() - .map(WebSocketRequest::body_json) - .filter(|request| { - request["type"] == "conversation.item.create" - && request["item"]["type"] == "function_call_output" - }) - .collect() -} - fn assert_v2_function_call_output(request: &Value, call_id: &str, expected_output: &str) { assert_eq!( request, @@ -2393,6 +2611,27 @@ fn assert_v2_progress_update(request: &Value, expected_text: &str) { ); } +fn assert_v2_backend_item_update(request: &Value, expected_text: &str) { + assert_v2_items_update(request, &format!("[BACKEND] {expected_text}")); +} + +fn assert_v2_items_update(request: &Value, expected_text: &str) { + assert_eq!( + request, + &json!({ + "type": "conversation.item.create", + "item": { + "type": "message", + "role": "developer", + "content": [{ + "type": "input_text", + "text": format!("{RESPONSE_ITEM_PREFIX}\n\n{expected_text}") + }] + } + }) + ); +} + fn assert_v2_user_text_item(request: &Value, expected_text: &str) { assert_eq!( request, diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index a1a861c87..76f7e3437 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -32,6 +32,7 @@ use codex_protocol::error::CodexErr; use codex_protocol::error::Result as CodexResult; use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::ConversationAudioParams; +use codex_protocol::protocol::ConversationSpeechParams; use codex_protocol::protocol::ConversationStartParams; use codex_protocol::protocol::ConversationStartTransport; use codex_protocol::protocol::ConversationTextParams; @@ -103,25 +104,21 @@ enum RealtimeSessionKind { #[derive(Clone, Debug)] struct RealtimeHandoffState { - output_tx: Sender, + output_tx: Sender, active_handoff: Arc>>, last_output_text: Arc>>, + codex_responses_as_items: bool, + codex_response_item_prefix: Option, session_kind: RealtimeSessionKind, } #[derive(Debug, PartialEq, Eq)] -enum HandoffOutput { - StandaloneAssistantOutput { - output_text: String, - }, - ProgressUpdate { - handoff_id: String, - output_text: String, - }, - FinalUpdate { - handoff_id: String, - output_text: String, - }, +enum RealtimeOutbound { + StandaloneHandoff { text: String }, + HandoffUpdate { handoff_id: String, text: String }, + CompletedHandoff { handoff_id: String, text: String }, + ConversationItem { text: String }, + HandoffCompleteAck { handoff_id: String }, } #[derive(Debug, PartialEq, Eq)] @@ -196,7 +193,7 @@ struct RealtimeInputTask { writer: RealtimeWebsocketWriter, events: RealtimeWebsocketEvents, text_rx: Receiver, - handoff_output_rx: Receiver, + handoff_output_rx: Receiver, audio_rx: Receiver, events_tx: Sender, handoff_state: RealtimeHandoffState, @@ -206,16 +203,23 @@ struct RealtimeInputTask { struct RealtimeInputChannels { text_rx: Receiver, - handoff_output_rx: Receiver, + handoff_output_rx: Receiver, audio_rx: Receiver, } impl RealtimeHandoffState { - fn new(output_tx: Sender, session_kind: RealtimeSessionKind) -> Self { + fn new( + output_tx: Sender, + codex_responses_as_items: bool, + codex_response_item_prefix: Option, + session_kind: RealtimeSessionKind, + ) -> Self { Self { output_tx, active_handoff: Arc::new(Mutex::new(None)), last_output_text: Arc::new(Mutex::new(None)), + codex_responses_as_items, + codex_response_item_prefix, session_kind, } } @@ -236,6 +240,8 @@ struct RealtimeStart { api_provider: ApiProvider, architecture: RealtimeConversationArchitecture, extra_headers: Option, + codex_responses_as_items: bool, + codex_response_item_prefix: Option, realtime_call_api_provider: Option, session_config: RealtimeSessionConfig, model_client: ModelClient, @@ -290,6 +296,8 @@ impl RealtimeConversationManager { api_provider, architecture, extra_headers, + codex_responses_as_items, + codex_response_item_prefix, realtime_call_api_provider, session_config, model_client, @@ -306,12 +314,17 @@ impl RealtimeConversationManager { let (text_tx, text_rx) = async_channel::bounded::(TEXT_IN_QUEUE_CAPACITY); let (handoff_output_tx, handoff_output_rx) = - async_channel::bounded::(HANDOFF_OUT_QUEUE_CAPACITY); + async_channel::bounded::(HANDOFF_OUT_QUEUE_CAPACITY); let (events_tx, events_rx) = async_channel::bounded::(OUTPUT_EVENTS_QUEUE_CAPACITY); let realtime_active = Arc::new(AtomicBool::new(true)); - let handoff = RealtimeHandoffState::new(handoff_output_tx, session_kind); + let handoff = RealtimeHandoffState::new( + handoff_output_tx, + codex_responses_as_items, + codex_response_item_prefix, + session_kind, + ); let input_channels = RealtimeInputChannels { text_rx, handoff_output_rx, @@ -480,29 +493,34 @@ impl RealtimeConversationManager { let active_handoff = handoff.active_handoff.lock().await.clone(); let output = match active_handoff { Some(handoff_id) => { - let output_text = prefix_realtime_text( - output_text, - REALTIME_BACKEND_TEXT_PREFIX, - handoff.session_kind, - ); + let output_text = realtime_backend_output(output_text, handoff.session_kind); *handoff.last_output_text.lock().await = Some(output_text.clone()); - HandoffOutput::ProgressUpdate { - handoff_id, - output_text, + if handoff.codex_responses_as_items { + RealtimeOutbound::ConversationItem { + text: realtime_backend_item( + output_text, + handoff.codex_response_item_prefix.as_deref(), + ), + } + } else { + RealtimeOutbound::HandoffUpdate { + handoff_id, + text: output_text, + } } } None if output_text.trim().is_empty() => return Ok(()), None => { - let output_text = prefix_realtime_text( - output_text, - REALTIME_BACKEND_TEXT_PREFIX, - handoff.session_kind, - ); - HandoffOutput::StandaloneAssistantOutput { - output_text: truncate_realtime_text_to_token_budget( - &output_text, - REALTIME_ASSISTANT_OUTPUT_TOKEN_BUDGET, - ), + let output_text = realtime_backend_output(output_text, handoff.session_kind); + if handoff.codex_responses_as_items { + RealtimeOutbound::ConversationItem { + text: realtime_backend_item( + output_text, + handoff.codex_response_item_prefix.as_deref(), + ), + } + } else { + RealtimeOutbound::StandaloneHandoff { text: output_text } } } }; @@ -514,6 +532,31 @@ impl RealtimeConversationManager { Ok(()) } + pub(crate) async fn append_speech(&self, text: String) -> CodexResult<()> { + if text.trim().is_empty() { + return Ok(()); + } + + let handoff = { + let guard = self.state.lock().await; + let Some(state) = guard.as_ref() else { + return Err(CodexErr::InvalidRequest( + "conversation is not running".to_string(), + )); + }; + state.handoff.clone() + }; + + handoff + .output_tx + .send(RealtimeOutbound::StandaloneHandoff { + text: realtime_backend_output(text, handoff.session_kind), + }) + .await + .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; + Ok(()) + } + pub(crate) async fn handoff_complete(&self) -> CodexResult<()> { let handoff = { let guard = self.state.lock().await; @@ -534,12 +577,18 @@ impl RealtimeConversationManager { return Ok(()); }; + let output = if handoff.codex_responses_as_items { + RealtimeOutbound::HandoffCompleteAck { handoff_id } + } else { + RealtimeOutbound::CompletedHandoff { + handoff_id, + text: output_text, + } + }; + handoff .output_tx - .send(HandoffOutput::FinalUpdate { - handoff_id, - output_text, - }) + .send(output) .await .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string())) } @@ -626,6 +675,8 @@ struct PreparedRealtimeConversationStart { api_provider: ApiProvider, architecture: RealtimeConversationArchitecture, extra_headers: Option, + codex_responses_as_items: bool, + codex_response_item_prefix: Option, realtime_call_api_provider: Option, requested_realtime_session_id: Option, version: RealtimeWsVersion, @@ -701,6 +752,8 @@ async fn prepare_realtime_start( api_provider, architecture, extra_headers, + codex_responses_as_items: params.codex_responses_as_items, + codex_response_item_prefix: params.codex_response_item_prefix, realtime_call_api_provider, requested_realtime_session_id, version, @@ -812,6 +865,19 @@ fn prefix_realtime_text(text: String, prefix: &str, session_kind: RealtimeSessio format!("{prefix}{text}") } +fn realtime_backend_output(output_text: String, session_kind: RealtimeSessionKind) -> String { + let output_text = prefix_realtime_text(output_text, REALTIME_BACKEND_TEXT_PREFIX, session_kind); + truncate_realtime_text_to_token_budget(&output_text, REALTIME_ASSISTANT_OUTPUT_TOKEN_BUDGET) +} + +fn realtime_backend_item(text: String, prefix: Option<&str>) -> String { + let text = match prefix.filter(|prefix| !prefix.is_empty()) { + Some(prefix) => format!("{prefix}\n\n{text}"), + None => text, + }; + truncate_realtime_text_to_token_budget(&text, REALTIME_ASSISTANT_OUTPUT_TOKEN_BUDGET) +} + fn validate_realtime_voice(version: RealtimeWsVersion, voice: RealtimeVoice) -> CodexResult<()> { let voices = RealtimeVoicesList::builtin(); let allowed = match version { @@ -846,6 +912,8 @@ async fn handle_start_inner( api_provider, architecture, extra_headers, + codex_responses_as_items, + codex_response_item_prefix, realtime_call_api_provider, requested_realtime_session_id, version, @@ -861,6 +929,8 @@ async fn handle_start_inner( api_provider, architecture, extra_headers, + codex_responses_as_items, + codex_response_item_prefix, realtime_call_api_provider, session_config, model_client: sess.services.model_client.clone(), @@ -1089,6 +1159,23 @@ pub(crate) async fn handle_text( } } +pub(crate) async fn handle_speech( + sess: &Arc, + sub_id: String, + params: ConversationSpeechParams, +) { + debug!(text = %params.text, "[realtime-text] appending realtime speech"); + if let Err(err) = sess.conversation.append_speech(params.text).await { + error!("failed to append realtime speech: {err}"); + if sess.conversation.running_state().await.is_some() { + warn!("realtime speech append failed while the session was already ending"); + } else { + send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest) + .await; + } + } +} + pub(crate) async fn handle_close(sess: &Arc, sub_id: String) { end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Requested).await; } @@ -1256,7 +1343,7 @@ async fn handle_text_input( } async fn handle_handoff_output( - handoff_output: Result, + handoff_output: Result, writer: &RealtimeWebsocketWriter, events_tx: &Sender, handoff_state: &RealtimeHandoffState, @@ -1267,45 +1354,39 @@ async fn handle_handoff_output( let result = match event_parser { RealtimeEventParser::V1 => match handoff_output { - HandoffOutput::StandaloneAssistantOutput { output_text } => { + RealtimeOutbound::StandaloneHandoff { text } => { // TODO(guinness): Use the new client event for standalone handoffs once the API changes are complete. writer - .send_conversation_handoff_append( - STANDALONE_HANDOFF_ID.to_string(), - output_text, - ) + .send_conversation_handoff_append(STANDALONE_HANDOFF_ID.to_string(), text) .await } - HandoffOutput::ProgressUpdate { - handoff_id, - output_text, - } - | HandoffOutput::FinalUpdate { - handoff_id, - output_text, - } => { + RealtimeOutbound::HandoffUpdate { handoff_id, text } + | RealtimeOutbound::CompletedHandoff { handoff_id, text } => { writer - .send_conversation_function_call_output(handoff_id, output_text) + .send_conversation_function_call_output(handoff_id, text) .await } + RealtimeOutbound::ConversationItem { text } => { + writer + .send_conversation_item_create(text, ConversationTextRole::Developer) + .await + } + RealtimeOutbound::HandoffCompleteAck { .. } => Ok(()), }, RealtimeEventParser::RealtimeV2 => match handoff_output { - HandoffOutput::StandaloneAssistantOutput { output_text } => { + RealtimeOutbound::StandaloneHandoff { text } => { if let Err(err) = writer - .send_conversation_item_create(output_text, ConversationTextRole::User) + .send_conversation_item_create(text, ConversationTextRole::User) .await { Err(err) } else { return response_create_queue - .request_create(writer, events_tx, "standalone assistant output") + .request_create(writer, events_tx, "standalone handoff") .await; } } - HandoffOutput::ProgressUpdate { - handoff_id, - output_text, - } => { + RealtimeOutbound::HandoffUpdate { handoff_id, text } => { let active_handoff = handoff_state.active_handoff.lock().await.clone(); match active_handoff { Some(active_handoff) if active_handoff == handoff_id => {} @@ -1315,12 +1396,12 @@ async fn handle_handoff_output( } } writer - .send_conversation_item_create(output_text, ConversationTextRole::User) + .send_conversation_item_create(text, ConversationTextRole::User) .await } - HandoffOutput::FinalUpdate { + RealtimeOutbound::CompletedHandoff { handoff_id, - output_text: _, + text: _, } => { if let Err(err) = writer .send_conversation_function_call_output( @@ -1336,6 +1417,16 @@ async fn handle_handoff_output( .await; } } + RealtimeOutbound::ConversationItem { text } => { + writer + .send_conversation_item_create(text, ConversationTextRole::Developer) + .await + } + RealtimeOutbound::HandoffCompleteAck { handoff_id } => { + writer + .send_conversation_function_call_output(handoff_id, String::new()) + .await + } }, }; if let Err(err) = result { @@ -1449,7 +1540,6 @@ async fn handle_realtime_server_event( match session_kind { RealtimeSessionKind::V1 => { - *handoff_state.last_output_text.lock().await = None; *handoff_state.active_handoff.lock().await = Some(handoff.handoff_id.clone()); } RealtimeSessionKind::V2 => { @@ -1477,7 +1567,6 @@ async fn handle_realtime_server_event( .await?; } None => { - *handoff_state.last_output_text.lock().await = None; *handoff_state.active_handoff.lock().await = Some(handoff.handoff_id.clone()); } diff --git a/codex-rs/core/src/realtime_conversation_tests.rs b/codex-rs/core/src/realtime_conversation_tests.rs index b67205ef8..147e0cc12 100644 --- a/codex-rs/core/src/realtime_conversation_tests.rs +++ b/codex-rs/core/src/realtime_conversation_tests.rs @@ -128,7 +128,12 @@ fn wraps_realtime_delegation_input_with_xml_escaping_without_transcript() { #[tokio::test] async fn clears_active_handoff_explicitly() { let (tx, _rx) = bounded(1); - let state = RealtimeHandoffState::new(tx, RealtimeSessionKind::V1); + let state = RealtimeHandoffState::new( + tx, + /*codex_responses_as_items*/ false, + /*codex_response_item_prefix*/ None, + RealtimeSessionKind::V1, + ); *state.active_handoff.lock().await = Some("handoff_1".to_string()); assert_eq!( diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index d4525cf45..4ed79d04e 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -1,5 +1,6 @@ use crate::realtime_conversation::handle_audio as handle_realtime_conversation_audio; use crate::realtime_conversation::handle_close as handle_realtime_conversation_close; +use crate::realtime_conversation::handle_speech as handle_realtime_conversation_speech; use crate::realtime_conversation::handle_start as handle_realtime_conversation_start; use crate::realtime_conversation::handle_text as handle_realtime_conversation_text; use async_channel::Receiver; @@ -737,6 +738,10 @@ pub(super) async fn submission_loop( handle_realtime_conversation_text(&sess, sub.id.clone(), params).await; false } + Op::RealtimeConversationSpeech(params) => { + handle_realtime_conversation_speech(&sess, sub.id.clone(), params).await; + false + } Op::RealtimeConversationClose => { handle_realtime_conversation_close(&sess, sub.id.clone()).await; false diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index b6cbb31c4..adc12ec48 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -206,6 +206,8 @@ async fn start_realtime_conversation(codex: &codex_core::CodexThread) -> Result< codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 5fb303cba..ae9a1f25c 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -285,6 +285,8 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -427,6 +429,8 @@ async fn conversation_start_defaults_to_v2_and_gpt_realtime_1_5() -> Result<()> test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -518,6 +522,8 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: Some("session-override-model".to_string()), output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -698,6 +704,8 @@ async fn conversation_webrtc_start_uses_avas_architecture_query() -> Result<()> test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: Some(RealtimeConversationArchitecture::Avas), + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -796,6 +804,8 @@ async fn conversation_webrtc_start_uses_configured_call_base_url_for_avas() -> R test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: Some(RealtimeConversationArchitecture::Avas), + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -886,6 +896,8 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join( test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -973,6 +985,8 @@ async fn conversation_webrtc_sideband_connect_failure_closes_with_error() -> Res test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -1062,6 +1076,8 @@ async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -1131,6 +1147,8 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -1224,6 +1242,8 @@ async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Res test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -1271,6 +1291,8 @@ async fn conversation_start_connect_failure_emits_realtime_error_only() -> Resul test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -1366,6 +1388,8 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("old".to_string())), @@ -1392,6 +1416,8 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("new".to_string())), @@ -1489,6 +1515,8 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -1554,6 +1582,8 @@ async fn conversation_uses_default_realtime_backend_prompt() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: None, @@ -1627,6 +1657,8 @@ async fn conversation_uses_empty_instructions_for_null_or_empty_prompt() -> Resu test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt, @@ -1693,6 +1725,8 @@ async fn conversation_uses_explicit_start_voice() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -1751,6 +1785,8 @@ async fn conversation_uses_configured_realtime_voice() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -1797,6 +1833,8 @@ async fn conversation_rejects_voice_for_wrong_realtime_version() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -1844,6 +1882,8 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() -> test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("prompt from op".to_string())), @@ -1917,6 +1957,8 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() - test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("prompt from op".to_string())), @@ -1984,6 +2026,8 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() -> test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("prompt from op".to_string())), @@ -2044,6 +2088,8 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -2158,6 +2204,8 @@ async fn conversation_startup_context_current_thread_selects_many_turns_by_budge codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -2266,6 +2314,8 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<() test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -2326,6 +2376,8 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() -> test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -2407,6 +2459,8 @@ async fn conversation_user_text_turn_is_not_sent_to_realtime() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -2504,6 +2558,8 @@ async fn realtime_v2_noop_tool_call_returns_empty_function_output_without_respon test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -2603,6 +2659,8 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Re test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -2740,6 +2798,8 @@ async fn conversation_handoff_persists_across_item_done_until_turn_complete() -> test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -2892,6 +2952,8 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -2994,6 +3056,8 @@ async fn inbound_handoff_request_uses_active_transcript() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -3097,6 +3161,8 @@ async fn inbound_handoff_request_sends_transcript_delta_after_each_handoff() -> test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -3198,6 +3264,8 @@ async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio( test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -3321,6 +3389,8 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -3474,6 +3544,8 @@ async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> R test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -3616,6 +3688,8 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> { test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), @@ -3769,6 +3843,8 @@ async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio() test.codex .submit(Op::RealtimeConversationStart(ConversationStartParams { architecture: None, + codex_responses_as_items: false, + codex_response_item_prefix: None, model: None, output_modality: RealtimeOutputModality::Audio, prompt: Some(Some("backend prompt".to_string())), diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 6cf6466d0..8f4cb60c4 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -182,6 +182,10 @@ pub struct McpServerRefreshConfig { pub struct ConversationStartParams { /// Overrides the configured realtime architecture for this session only. pub architecture: Option, + /// Sends automatic Codex responses as realtime conversation items instead of handoff appends. + pub codex_responses_as_items: bool, + /// Optional prefix added to automatic Codex response items when `codex_responses_as_items` is set. + pub codex_response_item_prefix: Option, /// Overrides the configured realtime model for this session only. pub model: Option, /// Selects whether the realtime session should produce text or audio output. @@ -407,6 +411,11 @@ pub enum ConversationTextRole { Developer, } +#[derive(Debug, Clone, PartialEq)] +pub struct ConversationSpeechParams { + pub text: String, +} + /// Persistent thread-settings overrides that can be applied before user input or /// on their own. #[derive(Debug, Clone, Default, PartialEq)] @@ -503,6 +512,9 @@ pub enum Op { /// Send text input to the running realtime conversation stream. RealtimeConversationText(ConversationTextParams), + /// Append speakable text to the running realtime conversation stream. + RealtimeConversationSpeech(ConversationSpeechParams), + /// Close the running realtime conversation stream. RealtimeConversationClose, @@ -762,6 +774,7 @@ impl Op { Self::RealtimeConversationStart(_) => "realtime_conversation_start", Self::RealtimeConversationAudio(_) => "realtime_conversation_audio", Self::RealtimeConversationText(_) => "realtime_conversation_text", + Self::RealtimeConversationSpeech(_) => "realtime_conversation_speech", Self::RealtimeConversationClose => "realtime_conversation_close", Self::RealtimeConversationListVoices => "realtime_conversation_list_voices", Self::UserInput { .. } => "user_input",