Forward standalone assistant output to realtime (#27319)

## Why

When a realtime session is open without an active frontend-model
handoff, completed Codex assistant messages are currently dropped. That
prevents the frontend model from hearing orchestrator preambles and
final responses produced by typed turns or other non-handoff work, which
makes the two models present as disconnected personas.

Active handoffs already forward each completed assistant message,
including preambles. This change leaves those V1 and V2 paths intact and
fills only the no-active-handoff gap.

## What changed

- Send standalone V1 assistant messages through
`conversation.handoff.append` with a stable synthetic handoff ID
- Send standalone V2 assistant messages as normal `[BACKEND]`
`conversation.item.create` message items, then enqueue `response.create`
so the frontend model responds
- Preserve the existing active V1 and V2 transport and completion
behavior
- Continue excluding user messages from realtime mirroring
- Skip empty output and cap each complete context injection, including
its V2 prefix, at 1,000 tokens
- Add end-to-end coverage for both wire formats, V2 response creation,
preambles, final responses, and truncation

## Test plan

- CI
This commit is contained in:
guinness-oai
2026-06-10 14:32:29 -07:00
committed by GitHub
Unverified
parent 387adc6c4b
commit 22dd6ebc7d
4 changed files with 199 additions and 24 deletions
@@ -36,7 +36,10 @@ use codex_app_server_protocol::ThreadRealtimeTranscriptDoneNotification;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_features::FEATURES;
use codex_features::Feature;
use codex_protocol::protocol::RealtimeConversationVersion;
@@ -1342,6 +1345,136 @@ async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()>
Ok(())
}
#[tokio::test]
async fn webrtc_assistant_output_without_handoff_reaches_realtime() -> Result<()> {
skip_if_no_network!(Ok(()));
let final_answer = "long output ".repeat(1_000);
for (version, expected_version, preamble) in [
(
RealtimeTestVersion::V1,
RealtimeConversationVersion::V1,
"direct preamble from v1",
),
(
RealtimeTestVersion::V2,
RealtimeConversationVersion::V2,
"direct preamble from v2",
),
] {
let mut harness = RealtimeE2eHarness::new(
version,
main_loop_responses(vec![responses::sse(vec![
responses::ev_response_created("resp-1"),
json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": "msg-preamble",
"phase": "commentary",
"content": [{"type": "output_text", "text": preamble}]
}
}),
responses::ev_assistant_message("msg-final", &final_answer),
responses::ev_completed("resp-1"),
])]),
realtime_sideband(vec![realtime_sideband_connection(vec![
vec![session_updated("sess_standalone_output")],
vec![],
match version {
RealtimeTestVersion::V1 => vec![],
RealtimeTestVersion::V2 => vec![
json!({
"type": "response.created",
"response": { "id": "resp_preamble" }
}),
json!({
"type": "response.done",
"response": { "id": "resp_preamble" }
}),
],
},
vec![],
vec![],
])]),
)
.await?;
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
assert_eq!(started.started.version, expected_version);
let request_id = harness
.mcp
.send_turn_start_request(TurnStartParams {
thread_id: harness.thread_id.clone(),
input: vec![V2UserInput::Text {
text: "direct text turn".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
harness
.mcp
.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let _: TurnStartResponse = to_response(response)?;
let _ = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
let preamble_request = harness.sideband_outbound_request(/*request_index*/ 1).await;
let output_text = match version {
RealtimeTestVersion::V1 => {
let final_request = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_eq!(
preamble_request,
json!({
"type": "conversation.handoff.append",
"handoff_id": "codex",
"output_text": preamble,
})
);
assert_eq!(final_request["type"], "conversation.handoff.append");
assert_eq!(final_request["handoff_id"], "codex");
final_request["output_text"]
.as_str()
.expect("output text")
.to_string()
}
RealtimeTestVersion::V2 => {
assert_v2_progress_update(&preamble_request, preamble);
assert_v2_response_create(
&harness.sideband_outbound_request(/*request_index*/ 2).await,
);
let final_request = harness.sideband_outbound_request(/*request_index*/ 3).await;
assert_eq!(final_request["type"], "conversation.item.create");
assert_eq!(final_request["item"]["type"], "message");
assert_eq!(final_request["item"]["role"], "user");
assert_eq!(final_request["item"]["content"][0]["type"], "input_text");
let output_text = final_request["item"]["content"][0]["text"]
.as_str()
.expect("output text");
assert!(output_text.starts_with("[BACKEND] "));
assert_v2_response_create(
&harness.sideband_outbound_request(/*request_index*/ 4).await,
);
output_text.to_string()
}
};
assert!(output_text.contains("tokens truncated"));
assert!(output_text.len() <= 4_000);
harness.shutdown().await;
}
Ok(())
}
#[tokio::test]
async fn webrtc_v2_forwards_audio_and_text_between_client_and_sideband() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -291,6 +291,18 @@ impl RealtimeWebsocketWriter {
.await
}
pub async fn send_conversation_handoff_append(
&self,
handoff_id: String,
output_text: String,
) -> Result<(), ApiError> {
self.send_json(&RealtimeOutboundMessage::ConversationHandoffAppend {
handoff_id,
output_text,
})
.await
}
pub async fn send_conversation_function_call_output(
&self,
call_id: String,
+53 -21
View File
@@ -1,5 +1,6 @@
use crate::client::ModelClient;
use crate::realtime_context::build_realtime_startup_context;
use crate::realtime_context::truncate_realtime_text_to_token_budget;
use crate::realtime_prompt::prepare_realtime_backend_prompt;
use crate::session::session::Session;
use anyhow::Context;
@@ -64,6 +65,8 @@ const USER_TEXT_IN_QUEUE_CAPACITY: usize = 64;
const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64;
const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256;
const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_300;
const REALTIME_ASSISTANT_OUTPUT_TOKEN_BUDGET: usize = 1_000;
const STANDALONE_HANDOFF_ID: &str = "codex";
const DEFAULT_REALTIME_MODEL: &str = "gpt-realtime-1.5";
pub(crate) const REALTIME_USER_TEXT_PREFIX: &str = "[USER] ";
pub(crate) const REALTIME_BACKEND_TEXT_PREFIX: &str = "[BACKEND] ";
@@ -106,6 +109,9 @@ struct RealtimeHandoffState {
#[derive(Debug, PartialEq, Eq)]
enum HandoffOutput {
StandaloneAssistantOutput {
output_text: String,
},
ProgressUpdate {
handoff_id: String,
output_text: String,
@@ -460,22 +466,38 @@ impl RealtimeConversationManager {
state.handoff.clone()
};
let Some(handoff_id) = handoff.active_handoff.lock().await.clone() else {
return Ok(());
let active_handoff = handoff.active_handoff.lock().await.clone();
let output = match active_handoff {
Some(handoff_id) => {
let output_text = prefix_realtime_text(
output_text,
REALTIME_BACKEND_TEXT_PREFIX,
handoff.session_kind,
);
*handoff.last_output_text.lock().await = Some(output_text.clone());
HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
}
}
None if output_text.trim().is_empty() => return Ok(()),
None => {
let output_text = prefix_realtime_text(
output_text,
REALTIME_BACKEND_TEXT_PREFIX,
handoff.session_kind,
);
HandoffOutput::StandaloneAssistantOutput {
output_text: truncate_realtime_text_to_token_budget(
&output_text,
REALTIME_ASSISTANT_OUTPUT_TOKEN_BUDGET,
),
}
}
};
let output_text = prefix_realtime_text(
output_text,
REALTIME_BACKEND_TEXT_PREFIX,
handoff.session_kind,
);
*handoff.last_output_text.lock().await = Some(output_text.clone());
handoff
.output_tx
.send(HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
})
.send(output)
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
Ok(())
@@ -511,14 +533,6 @@ impl RealtimeConversationManager {
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))
}
pub(crate) async fn active_handoff_id(&self) -> Option<String> {
let handoff = {
let guard = self.state.lock().await;
guard.as_ref().map(|state| state.handoff.clone())
}?;
handoff.active_handoff.lock().await.clone()
}
pub(crate) async fn clear_active_handoff(&self) {
let handoff = {
let guard = self.state.lock().await;
@@ -1188,6 +1202,15 @@ async fn handle_handoff_output(
let result = match event_parser {
RealtimeEventParser::V1 => match handoff_output {
HandoffOutput::StandaloneAssistantOutput { output_text } => {
// TODO(guinness): Use the new client event for standalone handoffs once the API changes are complete.
writer
.send_conversation_handoff_append(
STANDALONE_HANDOFF_ID.to_string(),
output_text,
)
.await
}
HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
@@ -1202,6 +1225,15 @@ async fn handle_handoff_output(
}
},
RealtimeEventParser::RealtimeV2 => match handoff_output {
HandoffOutput::StandaloneAssistantOutput { output_text } => {
if let Err(err) = writer.send_conversation_item_create(output_text).await {
Err(err)
} else {
return response_create_queue
.request_create(writer, events_tx, "standalone assistant output")
.await;
}
}
HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
+1 -3
View File
@@ -1796,9 +1796,7 @@ impl Session {
let Some(text) = realtime_text_for_event(msg) else {
return;
};
if self.conversation.running_state().await.is_none()
|| self.conversation.active_handoff_id().await.is_none()
{
if self.conversation.running_state().await.is_none() {
return;
}
if let Err(err) = self.conversation.handoff_out(text).await {