diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index bd03af8e0..ddc0eae40 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -36,7 +36,10 @@ use codex_app_server_protocol::ThreadRealtimeTranscriptDoneNotification; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::TurnCompletedNotification; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStartedNotification; +use codex_app_server_protocol::UserInput as V2UserInput; use codex_features::FEATURES; use codex_features::Feature; use codex_protocol::protocol::RealtimeConversationVersion; @@ -1342,6 +1345,136 @@ async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()> Ok(()) } +#[tokio::test] +async fn webrtc_assistant_output_without_handoff_reaches_realtime() -> Result<()> { + skip_if_no_network!(Ok(())); + + let final_answer = "long output ".repeat(1_000); + for (version, expected_version, preamble) in [ + ( + RealtimeTestVersion::V1, + RealtimeConversationVersion::V1, + "direct preamble from v1", + ), + ( + RealtimeTestVersion::V2, + RealtimeConversationVersion::V2, + "direct preamble from v2", + ), + ] { + let mut harness = RealtimeE2eHarness::new( + version, + main_loop_responses(vec![responses::sse(vec![ + responses::ev_response_created("resp-1"), + json!({ + "type": "response.output_item.done", + "item": { + "type": "message", + "role": "assistant", + "id": "msg-preamble", + "phase": "commentary", + "content": [{"type": "output_text", "text": preamble}] + } + }), + responses::ev_assistant_message("msg-final", &final_answer), + responses::ev_completed("resp-1"), + ])]), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![session_updated("sess_standalone_output")], + vec![], + match version { + RealtimeTestVersion::V1 => vec![], + RealtimeTestVersion::V2 => vec![ + json!({ + "type": "response.created", + "response": { "id": "resp_preamble" } + }), + json!({ + "type": "response.done", + "response": { "id": "resp_preamble" } + }), + ], + }, + vec![], + vec![], + ])]), + ) + .await?; + + let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + assert_eq!(started.started.version, expected_version); + + let request_id = harness + .mcp + .send_turn_start_request(TurnStartParams { + thread_id: harness.thread_id.clone(), + input: vec![V2UserInput::Text { + text: "direct text turn".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + harness + .mcp + .read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let _: TurnStartResponse = to_response(response)?; + let _ = harness + .read_notification::("turn/completed") + .await?; + + let preamble_request = harness.sideband_outbound_request(/*request_index*/ 1).await; + let output_text = match version { + RealtimeTestVersion::V1 => { + let final_request = harness.sideband_outbound_request(/*request_index*/ 2).await; + assert_eq!( + preamble_request, + json!({ + "type": "conversation.handoff.append", + "handoff_id": "codex", + "output_text": preamble, + }) + ); + assert_eq!(final_request["type"], "conversation.handoff.append"); + assert_eq!(final_request["handoff_id"], "codex"); + final_request["output_text"] + .as_str() + .expect("output text") + .to_string() + } + RealtimeTestVersion::V2 => { + assert_v2_progress_update(&preamble_request, preamble); + assert_v2_response_create( + &harness.sideband_outbound_request(/*request_index*/ 2).await, + ); + let final_request = harness.sideband_outbound_request(/*request_index*/ 3).await; + assert_eq!(final_request["type"], "conversation.item.create"); + assert_eq!(final_request["item"]["type"], "message"); + assert_eq!(final_request["item"]["role"], "user"); + assert_eq!(final_request["item"]["content"][0]["type"], "input_text"); + let output_text = final_request["item"]["content"][0]["text"] + .as_str() + .expect("output text"); + assert!(output_text.starts_with("[BACKEND] ")); + assert_v2_response_create( + &harness.sideband_outbound_request(/*request_index*/ 4).await, + ); + output_text.to_string() + } + }; + assert!(output_text.contains("tokens truncated")); + assert!(output_text.len() <= 4_000); + + harness.shutdown().await; + } + + Ok(()) +} + #[tokio::test] async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index ad26549a2..d9a67c7b9 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -291,6 +291,18 @@ impl RealtimeWebsocketWriter { .await } + pub async fn send_conversation_handoff_append( + &self, + handoff_id: String, + output_text: String, + ) -> Result<(), ApiError> { + self.send_json(&RealtimeOutboundMessage::ConversationHandoffAppend { + handoff_id, + output_text, + }) + .await + } + pub async fn send_conversation_function_call_output( &self, call_id: String, diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 66dc653e5..33568f6f3 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -1,5 +1,6 @@ use crate::client::ModelClient; use crate::realtime_context::build_realtime_startup_context; +use crate::realtime_context::truncate_realtime_text_to_token_budget; use crate::realtime_prompt::prepare_realtime_backend_prompt; use crate::session::session::Session; use anyhow::Context; @@ -64,6 +65,8 @@ const USER_TEXT_IN_QUEUE_CAPACITY: usize = 64; const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64; const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256; const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_300; +const REALTIME_ASSISTANT_OUTPUT_TOKEN_BUDGET: usize = 1_000; +const STANDALONE_HANDOFF_ID: &str = "codex"; const DEFAULT_REALTIME_MODEL: &str = "gpt-realtime-1.5"; pub(crate) const REALTIME_USER_TEXT_PREFIX: &str = "[USER] "; pub(crate) const REALTIME_BACKEND_TEXT_PREFIX: &str = "[BACKEND] "; @@ -106,6 +109,9 @@ struct RealtimeHandoffState { #[derive(Debug, PartialEq, Eq)] enum HandoffOutput { + StandaloneAssistantOutput { + output_text: String, + }, ProgressUpdate { handoff_id: String, output_text: String, @@ -460,22 +466,38 @@ impl RealtimeConversationManager { state.handoff.clone() }; - let Some(handoff_id) = handoff.active_handoff.lock().await.clone() else { - return Ok(()); + let active_handoff = handoff.active_handoff.lock().await.clone(); + let output = match active_handoff { + Some(handoff_id) => { + let output_text = prefix_realtime_text( + output_text, + REALTIME_BACKEND_TEXT_PREFIX, + handoff.session_kind, + ); + *handoff.last_output_text.lock().await = Some(output_text.clone()); + HandoffOutput::ProgressUpdate { + handoff_id, + output_text, + } + } + None if output_text.trim().is_empty() => return Ok(()), + None => { + let output_text = prefix_realtime_text( + output_text, + REALTIME_BACKEND_TEXT_PREFIX, + handoff.session_kind, + ); + HandoffOutput::StandaloneAssistantOutput { + output_text: truncate_realtime_text_to_token_budget( + &output_text, + REALTIME_ASSISTANT_OUTPUT_TOKEN_BUDGET, + ), + } + } }; - - let output_text = prefix_realtime_text( - output_text, - REALTIME_BACKEND_TEXT_PREFIX, - handoff.session_kind, - ); - *handoff.last_output_text.lock().await = Some(output_text.clone()); handoff .output_tx - .send(HandoffOutput::ProgressUpdate { - handoff_id, - output_text, - }) + .send(output) .await .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; Ok(()) @@ -511,14 +533,6 @@ impl RealtimeConversationManager { .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string())) } - pub(crate) async fn active_handoff_id(&self) -> Option { - let handoff = { - let guard = self.state.lock().await; - guard.as_ref().map(|state| state.handoff.clone()) - }?; - handoff.active_handoff.lock().await.clone() - } - pub(crate) async fn clear_active_handoff(&self) { let handoff = { let guard = self.state.lock().await; @@ -1188,6 +1202,15 @@ async fn handle_handoff_output( let result = match event_parser { RealtimeEventParser::V1 => match handoff_output { + HandoffOutput::StandaloneAssistantOutput { output_text } => { + // TODO(guinness): Use the new client event for standalone handoffs once the API changes are complete. + writer + .send_conversation_handoff_append( + STANDALONE_HANDOFF_ID.to_string(), + output_text, + ) + .await + } HandoffOutput::ProgressUpdate { handoff_id, output_text, @@ -1202,6 +1225,15 @@ async fn handle_handoff_output( } }, RealtimeEventParser::RealtimeV2 => match handoff_output { + HandoffOutput::StandaloneAssistantOutput { output_text } => { + if let Err(err) = writer.send_conversation_item_create(output_text).await { + Err(err) + } else { + return response_create_queue + .request_create(writer, events_tx, "standalone assistant output") + .await; + } + } HandoffOutput::ProgressUpdate { handoff_id, output_text, diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index ddb31a9a5..8424ce216 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -1796,9 +1796,7 @@ impl Session { let Some(text) = realtime_text_for_event(msg) else { return; }; - if self.conversation.running_state().await.is_none() - || self.conversation.active_handoff_id().await.is_none() - { + if self.conversation.running_state().await.is_none() { return; } if let Err(err) = self.conversation.handoff_out(text).await {