Add realtime speech append control (#27917)

## Why

Realtime voice harness tuning needs app-side control over what backend
Codex text is spoken. Backend orchestrator text is written for a reading
UI, so automatically speaking every preamble, progress update, or final
assistant message can make the realtime voice model too chatty.

For experimentation, clients need two simple controls: keep app/client
text-item injection on the existing item-create path, and add an
explicit speakable path that app code can call only when it wants
realtime to speak. Automatic Codex output also needs an opt-in way to
switch from the protocol's default speakable path to regular realtime
items, with a caller-provided prefix so prompt wording can be tuned
outside core.

The default remains unchanged: if a client omits the new start fields
and never calls `appendSpeech`, automatic backend output continues down
the existing speakable path for the selected realtime protocol.

## What Changed

- Adds experimental `thread/realtime/appendSpeech` for app-provided
speakable text.
- Keeps existing `thread/realtime/appendText` as the item-create API for
app-provided realtime text items.
- Adds `codexResponsesAsItems` / `codex_responses_as_items` on
`thread/realtime/start` to send automatic Codex responses with
`conversation.item.create` instead of the protocol's default speakable
output path.
- Adds `codexResponseItemPrefix` / `codex_response_item_prefix` so
clients can prepend experiment instructions to those automatic Codex
response items.
- Keeps literal `conversation.handoff.append` routing scoped to the v1
speakable path; v2 default speech uses its item/function-output plus
`response.create` behavior.
- Removes the earlier public silent-context API and hardcoded
silent-context prefix.
- Updates realtime tests to cover default automatic speakable behavior,
opt-in automatic item-create behavior, and explicit `appendSpeech`
behavior.

## Validation

- `cargo check -p codex-core -p codex-app-server -p codex-api`
- `just test -p codex-app-server realtime_conversation`
- `just test -p codex-core realtime_conversation` (50/51 passed in the
filtered parallel run; the lone failure passed when rerun in isolation)
- `just test -p codex-core
conversation_mirrors_assistant_message_text_to_realtime_handoff`
- `just test -p codex-api
e2e_connect_and_exchange_events_against_mock_ws_server`
- `just fix -p codex-core`
- `just fix -p codex-app-server`
- `cargo build -p codex-cli`
This commit is contained in:
guinness-oai
2026-06-15 16:15:58 -07:00
committed by GitHub
Unverified
parent 9728992fab
commit 1d8ff89aa3
15 changed files with 783 additions and 220 deletions
@@ -831,6 +831,12 @@ client_request_definitions! {
serialization: thread_id(params.thread_id),
response: v2::ThreadRealtimeAppendTextResponse,
},
#[experimental("thread/realtime/appendSpeech")]
ThreadRealtimeAppendSpeech => "thread/realtime/appendSpeech" {
params: v2::ThreadRealtimeAppendSpeechParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRealtimeAppendSpeechResponse,
},
#[experimental("thread/realtime/stop")]
ThreadRealtimeStop => "thread/realtime/stop" {
params: v2::ThreadRealtimeStopParams,
@@ -3032,6 +3038,8 @@ mod tests {
request_id: RequestId::Integer(9),
params: v2::ThreadRealtimeStartParams {
architecture: Some(RealtimeConversationArchitecture::Avas),
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: "thr_123".to_string(),
model: Some("realtime-treatment-model".to_string()),
output_modality: RealtimeOutputModality::Audio,
@@ -3049,6 +3057,8 @@ mod tests {
"params": {
"architecture": "avas",
"threadId": "thr_123",
"codexResponsesAsItems": null,
"codexResponseItemPrefix": null,
"model": "realtime-treatment-model",
"outputModality": "audio",
"prompt": "You are on a call",
@@ -3069,6 +3079,8 @@ mod tests {
request_id: RequestId::Integer(9),
params: v2::ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: "thr_123".to_string(),
model: None,
output_modality: RealtimeOutputModality::Audio,
@@ -3086,6 +3098,8 @@ mod tests {
"params": {
"architecture": null,
"threadId": "thr_123",
"codexResponsesAsItems": null,
"codexResponseItemPrefix": null,
"model": null,
"outputModality": "audio",
"realtimeSessionId": null,
@@ -3101,6 +3115,8 @@ mod tests {
request_id: RequestId::Integer(9),
params: v2::ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: "thr_123".to_string(),
model: None,
output_modality: RealtimeOutputModality::Audio,
@@ -3118,6 +3134,8 @@ mod tests {
"params": {
"architecture": null,
"threadId": "thr_123",
"codexResponsesAsItems": null,
"codexResponseItemPrefix": null,
"model": null,
"outputModality": "audio",
"prompt": null,
@@ -3166,6 +3184,29 @@ mod tests {
Ok(())
}
#[test]
fn serialize_thread_realtime_append_speech() -> Result<()> {
let request = ClientRequest::ThreadRealtimeAppendSpeech {
request_id: RequestId::Integer(10),
params: v2::ThreadRealtimeAppendSpeechParams {
thread_id: "thr_123".to_string(),
text: "Short voice update".to_string(),
},
};
assert_eq!(
json!({
"method": "thread/realtime/appendSpeech",
"id": 10,
"params": {
"threadId": "thr_123",
"text": "Short voice update"
}
}),
serde_json::to_value(&request)?,
);
Ok(())
}
#[test]
fn serialize_thread_status_changed_notification() -> Result<()> {
let notification =
@@ -3276,6 +3317,8 @@ mod tests {
request_id: RequestId::Integer(1),
params: v2::ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: "thr_123".to_string(),
model: None,
output_modality: RealtimeOutputModality::Audio,
@@ -70,6 +70,12 @@ pub struct ThreadRealtimeStartParams {
/// Overrides the configured realtime architecture for this session only.
#[ts(optional = nullable)]
pub architecture: Option<RealtimeConversationArchitecture>,
/// Sends automatic Codex responses as realtime conversation items instead of handoff appends.
#[ts(optional = nullable)]
pub codex_responses_as_items: Option<bool>,
/// Optional prefix added to automatic Codex response items when `codexResponsesAsItems` is true.
#[ts(optional = nullable)]
pub codex_response_item_prefix: Option<String>,
/// Overrides the configured realtime model for this session only.
#[ts(optional = nullable)]
pub model: Option<String>,
@@ -146,6 +152,21 @@ pub struct ThreadRealtimeAppendTextParams {
#[ts(export_to = "v2/")]
pub struct ThreadRealtimeAppendTextResponse {}
/// EXPERIMENTAL - append speakable text to thread realtime.
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadRealtimeAppendSpeechParams {
pub thread_id: String,
pub text: String,
}
/// EXPERIMENTAL - response for appending realtime speech.
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadRealtimeAppendSpeechResponse {}
/// EXPERIMENTAL - stop thread realtime.
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
+11 -1
View File
@@ -165,9 +165,10 @@ Example with notification opt-out:
- `thread/inject_items` — append raw Responses API items to a loaded threads model-visible history without starting a user turn; returns `{}` on success.
- `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. `clientUserMessageId` is optional; when supplied, the corresponding `userMessage` item echoes it as `clientId`. Review and manual compaction turns reject `turn/steer`.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `thread/realtime/start` — start a thread-scoped realtime session (experimental); pass `outputModality: "text"` or `outputModality: "audio"` to choose model output, and optionally pass `model` and `version` to override configured realtime selection for this session only. Returns `{}` and streams `thread/realtime/*` notifications. Omit `transport` for the websocket transport, or pass `{ "type": "webrtc", "sdp": "..." }` to create a WebRTC session from a browser-generated SDP offer; the remote answer SDP is emitted as `thread/realtime/sdp`.
- `thread/realtime/start` — start a thread-scoped realtime session (experimental); pass `outputModality: "text"` or `outputModality: "audio"` to choose model output, and optionally pass `model` and `version` to override configured realtime selection for this session only. By default, automatic Codex text follows the protocol's speakable output path. Pass `codexResponsesAsItems: true` to send automatic Codex responses as realtime conversation items instead, and optionally pass `codexResponseItemPrefix` to prepend experiment instructions to those items. Returns `{}` and streams `thread/realtime/*` notifications. Omit `transport` for the websocket transport, or pass `{ "type": "webrtc", "sdp": "..." }` to create a WebRTC session from a browser-generated SDP offer; the remote answer SDP is emitted as `thread/realtime/sdp`.
- `thread/realtime/appendAudio` — append an input audio chunk to the active realtime session (experimental); returns `{}`.
- `thread/realtime/appendText` — append text input to the active realtime session with a required `role` of `user` or `developer` (experimental); returns `{}`. Older clients that omit `role` default to `user`.
- `thread/realtime/appendSpeech` — append text that the realtime model should speak to the user (experimental); returns `{}`.
- `thread/realtime/stop` — stop the active realtime session for the thread (experimental); returns `{}`.
- `review/start` — kick off Codexs automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.
- `command/exec` — run a single command under the server sandbox without starting a thread/turn (handy for utilities and validation).
@@ -878,6 +879,15 @@ Omit `prompt` to use Codex's default realtime backend prompt. Send `prompt: null
`prompt: ""` when the session should start without that default backend prompt.
Clients may also pass `model` and `version` on `thread/realtime/start` to select a
different realtime session configuration without changing thread or user config.
Pass `codexResponsesAsItems: true` to inject automatic Codex responses with
`conversation.item.create` instead of the protocol's default speakable output
path. When using that mode, `codexResponseItemPrefix` can prepend short
experiment instructions to each automatic Codex response item. Omit
`codexResponsesAsItems`, or pass `false`, to preserve the default speakable
behavior. Call
`thread/realtime/appendText` to append app-provided realtime text items, or
`thread/realtime/appendSpeech` when the app decides a realtime update should be
spoken.
```javascript
await pc.setRemoteDescription({
@@ -1317,6 +1317,11 @@ impl MessageProcessor {
.thread_realtime_append_text(&request_id, params)
.await
}
ClientRequest::ThreadRealtimeAppendSpeech { params, .. } => {
self.turn_processor
.thread_realtime_append_speech(&request_id, params)
.await
}
ClientRequest::ThreadRealtimeStop { params, .. } => {
self.turn_processor
.thread_realtime_stop(&request_id, params)
@@ -225,6 +225,8 @@ use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::ThreadRealtimeAppendAudioParams;
use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse;
use codex_app_server_protocol::ThreadRealtimeAppendSpeechParams;
use codex_app_server_protocol::ThreadRealtimeAppendSpeechResponse;
use codex_app_server_protocol::ThreadRealtimeAppendTextParams;
use codex_app_server_protocol::ThreadRealtimeAppendTextResponse;
use codex_app_server_protocol::ThreadRealtimeListVoicesResponse;
@@ -397,6 +399,7 @@ use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::ConversationSpeechParams;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::ConversationStartTransport;
use codex_protocol::protocol::ConversationTextParams;
@@ -182,6 +182,16 @@ impl TurnRequestProcessor {
.map(|response| response.map(Into::into))
}
pub(crate) async fn thread_realtime_append_speech(
&self,
request_id: &ConnectionRequestId,
params: ThreadRealtimeAppendSpeechParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.thread_realtime_append_speech_inner(request_id, params)
.await
.map(|response| response.map(Into::into))
}
pub(crate) async fn thread_realtime_stop(
&self,
request_id: &ConnectionRequestId,
@@ -942,6 +952,8 @@ impl TurnRequestProcessor {
thread.as_ref(),
Op::RealtimeConversationStart(ConversationStartParams {
architecture: params.architecture,
codex_responses_as_items: params.codex_responses_as_items.unwrap_or(false),
codex_response_item_prefix: params.codex_response_item_prefix,
model: params.model,
output_modality: params.output_modality,
prompt: params.prompt,
@@ -1018,6 +1030,31 @@ impl TurnRequestProcessor {
Ok(Some(ThreadRealtimeAppendTextResponse::default()))
}
async fn thread_realtime_append_speech_inner(
&self,
request_id: &ConnectionRequestId,
params: ThreadRealtimeAppendSpeechParams,
) -> Result<Option<ThreadRealtimeAppendSpeechResponse>, JSONRPCErrorError> {
let Some((_, thread)) = self
.prepare_realtime_conversation_thread(request_id, &params.thread_id)
.await?
else {
return Ok(None);
};
self.submit_core_op(
request_id,
thread.as_ref(),
Op::RealtimeConversationSpeech(ConversationSpeechParams { text: params.text }),
)
.await
.map_err(|err| {
internal_error(format!(
"failed to append realtime conversation speech: {err}"
))
})?;
Ok(Some(ThreadRealtimeAppendSpeechResponse::default()))
}
async fn thread_realtime_stop_inner(
&self,
request_id: &ConnectionRequestId,
@@ -90,6 +90,7 @@ use codex_app_server_protocol::ThreadMemoryModeSetParams;
use codex_app_server_protocol::ThreadMetadataUpdateParams;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadRealtimeAppendAudioParams;
use codex_app_server_protocol::ThreadRealtimeAppendSpeechParams;
use codex_app_server_protocol::ThreadRealtimeAppendTextParams;
use codex_app_server_protocol::ThreadRealtimeListVoicesParams;
use codex_app_server_protocol::ThreadRealtimeStartParams;
@@ -1036,6 +1037,16 @@ impl TestAppServer {
.await
}
/// Send a `thread/realtime/appendSpeech` JSON-RPC request (v2).
pub async fn send_thread_realtime_append_speech_request(
&mut self,
params: ThreadRealtimeAppendSpeechParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/realtime/appendSpeech", params)
.await
}
/// Send a `thread/realtime/stop` JSON-RPC request (v2).
pub async fn send_thread_realtime_stop_request(
&mut self,
@@ -80,6 +80,8 @@ async fn realtime_conversation_start_requires_experimental_api_capability() -> R
let request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: "thr_123".to_string(),
model: None,
output_modality: RealtimeOutputModality::Audio,
@@ -189,6 +191,8 @@ async fn realtime_webrtc_start_requires_experimental_api_capability() -> Result<
let request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: "thr_123".to_string(),
model: None,
output_modality: RealtimeOutputModality::Audio,
@@ -15,6 +15,8 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadRealtimeAppendAudioParams;
use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse;
use codex_app_server_protocol::ThreadRealtimeAppendSpeechParams;
use codex_app_server_protocol::ThreadRealtimeAppendSpeechResponse;
use codex_app_server_protocol::ThreadRealtimeAppendTextParams;
use codex_app_server_protocol::ThreadRealtimeAppendTextResponse;
use codex_app_server_protocol::ThreadRealtimeAudioChunk;
@@ -82,6 +84,8 @@ const V2_STEERING_ACKNOWLEDGEMENT: &str =
"This was sent to steer the previous background agent task.";
const V2_HANDOFF_COMPLETE_ACKNOWLEDGEMENT: &str =
"Background agent finished. Use the preceding [BACKEND] messages as the result.";
const RESPONSE_ITEM_PREFIX: &str =
"Use the following context to inform future responses, but do not speak it to the user.";
#[derive(Debug, Clone, Copy)]
enum StartupContextConfig<'a> {
@@ -309,6 +313,28 @@ impl RealtimeE2eHarness {
}
async fn start_webrtc_realtime(&mut self, offer_sdp: &str) -> Result<StartedWebrtcRealtime> {
self.start_webrtc_realtime_with_codex_responses_as_items(
offer_sdp, /*codex_responses_as_items*/ None,
)
.await
}
async fn start_webrtc_realtime_with_codex_response_items(
&mut self,
offer_sdp: &str,
) -> Result<StartedWebrtcRealtime> {
self.start_webrtc_realtime_with_codex_responses_as_items(
offer_sdp,
/*codex_responses_as_items*/ Some(true),
)
.await
}
async fn start_webrtc_realtime_with_codex_responses_as_items(
&mut self,
offer_sdp: &str,
codex_responses_as_items: Option<bool>,
) -> Result<StartedWebrtcRealtime> {
// Starts realtime through the public JSON-RPC method, then waits for the same client-visible
// notifications a desktop app needs: started first, SDP answer second.
let start_request_id = self
@@ -316,6 +342,10 @@ impl RealtimeE2eHarness {
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
architecture: None,
thread_id: self.thread_id.clone(),
codex_response_item_prefix: codex_responses_as_items
.unwrap_or(false)
.then(|| RESPONSE_ITEM_PREFIX.to_string()),
codex_responses_as_items,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -407,6 +437,24 @@ impl RealtimeE2eHarness {
Ok(())
}
async fn append_speech(&mut self, thread_id: String, text: &str) -> Result<()> {
let request_id = self
.mcp
.send_thread_realtime_append_speech_request(ThreadRealtimeAppendSpeechParams {
thread_id,
text: text.to_string(),
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
self.mcp
.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let _: ThreadRealtimeAppendSpeechResponse = to_response(response)?;
Ok(())
}
async fn main_loop_responses_requests(&self) -> Result<Vec<Value>> {
responses_requests(&self.main_loop_responses_server).await
}
@@ -564,6 +612,8 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: thread_start.thread.id.clone(),
model: Some("realtime-treatment-model".to_string()),
output_modality: RealtimeOutputModality::Audio,
@@ -840,6 +890,8 @@ async fn realtime_text_output_modality_requests_text_output_and_final_transcript
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: thread_start.thread.id.clone(),
model: None,
output_modality: RealtimeOutputModality::Text,
@@ -1017,6 +1069,8 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: thread_start.thread.id.clone(),
model: None,
output_modality: RealtimeOutputModality::Audio,
@@ -1117,6 +1171,8 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: thread_id.clone(),
model: None,
output_modality: RealtimeOutputModality::Audio,
@@ -1291,7 +1347,64 @@ async fn webrtc_v1_start_posts_offer_returns_sdp_and_joins_sideband() -> Result<
}
#[tokio::test]
async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()> {
async fn webrtc_v1_default_automatic_output_uses_handoff_append() -> Result<()> {
skip_if_no_network!(Ok(()));
let mut harness = RealtimeE2eHarness::new(
RealtimeTestVersion::V1,
main_loop_responses(vec![create_final_assistant_message_sse_response(
"legacy automatic speech",
)?]),
realtime_sideband(vec![realtime_sideband_connection(vec![
vec![session_updated("sess_v1_default_handoff")],
vec![],
vec![],
])]),
)
.await?;
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
assert_eq!(started.started.version, RealtimeConversationVersion::V1);
assert_v1_session_update(&harness.sideband_outbound_request(/*request_index*/ 0).await)?;
let turn_request_id = harness
.mcp
.send_turn_start_request(TurnStartParams {
thread_id: harness.thread_id.clone(),
input: vec![V2UserInput::Text {
text: "say the default output".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
harness
.mcp
.read_stream_until_response_message(RequestId::Integer(turn_request_id)),
)
.await??;
let _: TurnStartResponse = to_response(turn_response)?;
let _ = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
assert_eq!(
harness.sideband_outbound_request(/*request_index*/ 1).await,
json!({
"type": "conversation.handoff.append",
"handoff_id": "codex",
"output_text": "legacy automatic speech",
})
);
harness.shutdown().await;
Ok(())
}
#[tokio::test]
async fn webrtc_v1_handoff_request_delegates_context_and_manual_append_speaks() -> Result<()> {
skip_if_no_network!(Ok(()));
// Phase 1: script one v1 handoff request on the sideband and one delegated Responses turn.
@@ -1323,11 +1436,14 @@ async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()>
}),
],
vec![],
vec![],
])]),
)
.await?;
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
let started = harness
.start_webrtc_realtime_with_codex_response_items("v=offer\r\n")
.await?;
assert_eq!(started.started.version, RealtimeConversationVersion::V1);
assert_call_create_multipart(
harness.call_capture.single_request(),
@@ -1346,8 +1462,8 @@ async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()>
.await?;
assert_eq!(turn_completed.thread_id, harness.thread_id);
// Phase 3: assert the delegated prompt went to Responses, then the v1 handoff append went back
// over the existing sideband connection.
// Phase 3: assert the delegated prompt went to Responses, then the automatic v1 output went
// back over the existing sideband connection as a conversation item.
let requests = harness.main_loop_responses_requests().await?;
assert_eq!(requests.len(), 1);
assert!(
@@ -1358,13 +1474,32 @@ async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()>
"delegated Responses request should contain realtime delegation envelope: {}",
requests[0]
);
let handoff_append = harness.sideband_outbound_request(/*request_index*/ 1).await;
let context_update = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_eq!(
handoff_append,
context_update,
json!({
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "developer",
"content": [{
"type": "input_text",
"text": format!("{RESPONSE_ITEM_PREFIX}\n\ndelegated from v1")
}]
}
})
);
harness
.append_speech(harness.thread_id.clone(), "manual spoken v1 update")
.await?;
let spoken_append = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_eq!(
spoken_append,
json!({
"type": "conversation.handoff.append",
"handoff_id": "handoff_v1",
"output_text": "\"Agent Final Message\":\n\ndelegated from v1",
"handoff_id": "codex",
"output_text": "manual spoken v1 update",
})
);
@@ -1373,131 +1508,234 @@ async fn webrtc_v1_handoff_request_delegates_and_appends_result() -> Result<()>
}
#[tokio::test]
async fn webrtc_assistant_output_without_handoff_reaches_realtime() -> Result<()> {
async fn realtime_automatic_standalone_output_is_item_and_append_speaks() -> Result<()> {
skip_if_no_network!(Ok(()));
let mut harness = RealtimeE2eHarness::new(
RealtimeTestVersion::V2,
main_loop_responses(vec![create_final_assistant_message_sse_response(
"automatic output",
)?]),
realtime_sideband(vec![realtime_sideband_connection(vec![
vec![session_updated("sess_manual_handoff")],
vec![],
vec![],
vec![],
])]),
)
.await?;
let started = harness
.start_webrtc_realtime_with_codex_response_items("v=offer\r\n")
.await?;
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
assert_eq!(
harness.sideband_outbound_request(/*request_index*/ 0).await["type"].as_str(),
Some("session.update")
);
let turn_request_id = harness
.mcp
.send_turn_start_request(TurnStartParams {
thread_id: harness.thread_id.clone(),
input: vec![V2UserInput::Text {
text: "do something quietly".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
harness
.mcp
.read_stream_until_response_message(RequestId::Integer(turn_request_id)),
)
.await??;
let _: TurnStartResponse = to_response(turn_response)?;
let _ = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
assert_v2_backend_item_update(
&harness.sideband_outbound_request(/*request_index*/ 1).await,
"automatic output",
);
let automatic_response_create = timeout(
Duration::from_millis(200),
harness
.realtime_server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 2),
)
.await;
assert!(
automatic_response_create.is_err(),
"automatic item should not request a realtime response"
);
harness
.append_speech(harness.thread_id.clone(), "manual voice update")
.await?;
assert_v2_progress_update(
&harness.sideband_outbound_request(/*request_index*/ 2).await,
"manual voice update",
);
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 3).await);
harness.shutdown().await;
Ok(())
}
#[tokio::test]
async fn realtime_automatic_handoff_output_is_item_and_append_speaks() -> Result<()> {
skip_if_no_network!(Ok(()));
let mut harness = RealtimeE2eHarness::new(
RealtimeTestVersion::V2,
main_loop_responses(vec![create_final_assistant_message_sse_response(
"automatic final response",
)?]),
realtime_sideband(vec![realtime_sideband_connection(vec![
vec![
session_updated("sess_manual_update"),
v2_background_agent_tool_call("call_quiet", "delegate quietly"),
],
vec![],
vec![],
vec![],
vec![],
])]),
)
.await?;
let started = harness
.start_webrtc_realtime_with_codex_response_items("v=offer\r\n")
.await?;
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
assert_eq!(
harness.sideband_outbound_request(/*request_index*/ 0).await["type"].as_str(),
Some("session.update")
);
let turn_started = harness
.read_notification::<TurnStartedNotification>("turn/started")
.await?;
assert_eq!(turn_started.thread_id, harness.thread_id);
let turn_completed = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
assert_eq!(turn_completed.thread_id, harness.thread_id);
assert_v2_backend_item_update(
&harness.sideband_outbound_request(/*request_index*/ 1).await,
"automatic final response",
);
assert_v2_function_call_output(
&harness.sideband_outbound_request(/*request_index*/ 2).await,
"call_quiet",
"",
);
let automatic_response_create = timeout(
Duration::from_millis(200),
harness
.realtime_server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 3),
)
.await;
assert!(
automatic_response_create.is_err(),
"automatic handoff item should not request a realtime response"
);
harness
.append_speech(harness.thread_id.clone(), "manual spoken update")
.await?;
assert_v2_progress_update(
&harness.sideband_outbound_request(/*request_index*/ 3).await,
"manual spoken update",
);
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 4).await);
harness.shutdown().await;
Ok(())
}
#[tokio::test]
async fn webrtc_v2_assistant_output_without_handoff_reaches_realtime_context() -> Result<()> {
skip_if_no_network!(Ok(()));
let final_answer = "long output ".repeat(1_000);
for (version, expected_version, preamble) in [
(
RealtimeTestVersion::V1,
RealtimeConversationVersion::V1,
"direct preamble from v1",
),
(
RealtimeTestVersion::V2,
RealtimeConversationVersion::V2,
"direct preamble from v2",
),
] {
let mut harness = RealtimeE2eHarness::new(
version,
main_loop_responses(vec![responses::sse(vec![
responses::ev_response_created("resp-1"),
json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": "msg-preamble",
"phase": "commentary",
"content": [{"type": "output_text", "text": preamble}]
}
}),
responses::ev_assistant_message("msg-final", &final_answer),
responses::ev_completed("resp-1"),
])]),
realtime_sideband(vec![realtime_sideband_connection(vec![
vec![session_updated("sess_standalone_output")],
vec![],
match version {
RealtimeTestVersion::V1 => vec![],
RealtimeTestVersion::V2 => vec![
json!({
"type": "response.created",
"response": { "id": "resp_preamble" }
}),
json!({
"type": "response.done",
"response": { "id": "resp_preamble" }
}),
],
},
vec![],
vec![],
])]),
)
let preamble = "direct preamble from v2";
let mut harness = RealtimeE2eHarness::new(
RealtimeTestVersion::V2,
main_loop_responses(vec![responses::sse(vec![
responses::ev_response_created("resp-1"),
json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": "msg-preamble",
"phase": "commentary",
"content": [{"type": "output_text", "text": preamble}]
}
}),
responses::ev_assistant_message("msg-final", &final_answer),
responses::ev_completed("resp-1"),
])]),
realtime_sideband(vec![realtime_sideband_connection(vec![
vec![session_updated("sess_standalone_output")],
vec![],
vec![],
])]),
)
.await?;
let started = harness
.start_webrtc_realtime_with_codex_response_items("v=offer\r\n")
.await?;
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
let request_id = harness
.mcp
.send_turn_start_request(TurnStartParams {
thread_id: harness.thread_id.clone(),
input: vec![V2UserInput::Text {
text: "direct text turn".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
harness
.mcp
.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let _: TurnStartResponse = to_response(response)?;
let _ = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
assert_eq!(started.started.version, expected_version);
assert_v2_backend_item_update(
&harness.sideband_outbound_request(/*request_index*/ 1).await,
preamble,
);
let final_request = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_eq!(final_request["type"], "conversation.item.create");
assert_eq!(final_request["item"]["type"], "message");
assert_eq!(final_request["item"]["role"], "developer");
assert_eq!(final_request["item"]["content"][0]["type"], "input_text");
let output_text = final_request["item"]["content"][0]["text"]
.as_str()
.expect("output text");
assert!(output_text.starts_with(&format!("{RESPONSE_ITEM_PREFIX}\n\n[BACKEND] ")));
assert!(output_text.contains("tokens truncated"));
assert!(output_text.len() <= 4_000);
let request_id = harness
.mcp
.send_turn_start_request(TurnStartParams {
thread_id: harness.thread_id.clone(),
input: vec![V2UserInput::Text {
text: "direct text turn".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
harness
.mcp
.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let _: TurnStartResponse = to_response(response)?;
let _ = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
let preamble_request = harness.sideband_outbound_request(/*request_index*/ 1).await;
let output_text = match version {
RealtimeTestVersion::V1 => {
let final_request = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_eq!(
preamble_request,
json!({
"type": "conversation.handoff.append",
"handoff_id": "codex",
"output_text": preamble,
})
);
assert_eq!(final_request["type"], "conversation.handoff.append");
assert_eq!(final_request["handoff_id"], "codex");
final_request["output_text"]
.as_str()
.expect("output text")
.to_string()
}
RealtimeTestVersion::V2 => {
assert_v2_progress_update(&preamble_request, preamble);
assert_v2_response_create(
&harness.sideband_outbound_request(/*request_index*/ 2).await,
);
let final_request = harness.sideband_outbound_request(/*request_index*/ 3).await;
assert_eq!(final_request["type"], "conversation.item.create");
assert_eq!(final_request["item"]["type"], "message");
assert_eq!(final_request["item"]["role"], "user");
assert_eq!(final_request["item"]["content"][0]["type"], "input_text");
let output_text = final_request["item"]["content"][0]["text"]
.as_str()
.expect("output text");
assert!(output_text.starts_with("[BACKEND] "));
assert_v2_response_create(
&harness.sideband_outbound_request(/*request_index*/ 4).await,
);
output_text.to_string()
}
};
assert!(output_text.contains("tokens truncated"));
assert!(output_text.len() <= 4_000);
harness.shutdown().await;
}
harness.shutdown().await;
Ok(())
}
@@ -1807,14 +2045,6 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out
let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_v2_function_call_output(&tool_output, "call_v2", V2_HANDOFF_COMPLETE_ACKNOWLEDGEMENT);
assert_eq!(
function_call_output_sideband_requests(&harness.realtime_server).len(),
1
);
// Phase 4: after the final function-call output, realtime needs an explicit
// `response.create` to produce the next user-visible response.
assert_v2_response_create(&harness.sideband_outbound_request(/*request_index*/ 3).await);
harness.shutdown().await;
Ok(())
@@ -2036,10 +2266,6 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
"call_shell",
V2_HANDOFF_COMPLETE_ACKNOWLEDGEMENT,
);
assert_eq!(
function_call_output_sideband_requests(&harness.realtime_server).len(),
1
);
harness.shutdown().await;
Ok(())
@@ -2165,6 +2391,8 @@ async fn realtime_webrtc_start_surfaces_backend_error() -> Result<()> {
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: thread_start.thread.id,
model: None,
output_modality: RealtimeOutputModality::Audio,
@@ -2227,6 +2455,8 @@ async fn realtime_conversation_requires_feature_flag() -> Result<()> {
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
architecture: None,
codex_responses_as_items: None,
codex_response_item_prefix: None,
thread_id: thread_start.thread.id.clone(),
model: None,
output_modality: RealtimeOutputModality::Audio,
@@ -2350,18 +2580,6 @@ fn realtime_tool_ok_command() -> Vec<String> {
}
}
fn function_call_output_sideband_requests(server: &WebSocketTestServer) -> Vec<Value> {
server
.single_connection()
.iter()
.map(WebSocketRequest::body_json)
.filter(|request| {
request["type"] == "conversation.item.create"
&& request["item"]["type"] == "function_call_output"
})
.collect()
}
fn assert_v2_function_call_output(request: &Value, call_id: &str, expected_output: &str) {
assert_eq!(
request,
@@ -2393,6 +2611,27 @@ fn assert_v2_progress_update(request: &Value, expected_text: &str) {
);
}
fn assert_v2_backend_item_update(request: &Value, expected_text: &str) {
assert_v2_items_update(request, &format!("[BACKEND] {expected_text}"));
}
fn assert_v2_items_update(request: &Value, expected_text: &str) {
assert_eq!(
request,
&json!({
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "developer",
"content": [{
"type": "input_text",
"text": format!("{RESPONSE_ITEM_PREFIX}\n\n{expected_text}")
}]
}
})
);
}
fn assert_v2_user_text_item(request: &Value, expected_text: &str) {
assert_eq!(
request,
+156 -67
View File
@@ -32,6 +32,7 @@ use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::ConversationSpeechParams;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::ConversationStartTransport;
use codex_protocol::protocol::ConversationTextParams;
@@ -103,25 +104,21 @@ enum RealtimeSessionKind {
#[derive(Clone, Debug)]
struct RealtimeHandoffState {
output_tx: Sender<HandoffOutput>,
output_tx: Sender<RealtimeOutbound>,
active_handoff: Arc<Mutex<Option<String>>>,
last_output_text: Arc<Mutex<Option<String>>>,
codex_responses_as_items: bool,
codex_response_item_prefix: Option<String>,
session_kind: RealtimeSessionKind,
}
#[derive(Debug, PartialEq, Eq)]
enum HandoffOutput {
StandaloneAssistantOutput {
output_text: String,
},
ProgressUpdate {
handoff_id: String,
output_text: String,
},
FinalUpdate {
handoff_id: String,
output_text: String,
},
enum RealtimeOutbound {
StandaloneHandoff { text: String },
HandoffUpdate { handoff_id: String, text: String },
CompletedHandoff { handoff_id: String, text: String },
ConversationItem { text: String },
HandoffCompleteAck { handoff_id: String },
}
#[derive(Debug, PartialEq, Eq)]
@@ -196,7 +193,7 @@ struct RealtimeInputTask {
writer: RealtimeWebsocketWriter,
events: RealtimeWebsocketEvents,
text_rx: Receiver<ConversationTextParams>,
handoff_output_rx: Receiver<HandoffOutput>,
handoff_output_rx: Receiver<RealtimeOutbound>,
audio_rx: Receiver<RealtimeAudioFrame>,
events_tx: Sender<RealtimeEvent>,
handoff_state: RealtimeHandoffState,
@@ -206,16 +203,23 @@ struct RealtimeInputTask {
struct RealtimeInputChannels {
text_rx: Receiver<ConversationTextParams>,
handoff_output_rx: Receiver<HandoffOutput>,
handoff_output_rx: Receiver<RealtimeOutbound>,
audio_rx: Receiver<RealtimeAudioFrame>,
}
impl RealtimeHandoffState {
fn new(output_tx: Sender<HandoffOutput>, session_kind: RealtimeSessionKind) -> Self {
fn new(
output_tx: Sender<RealtimeOutbound>,
codex_responses_as_items: bool,
codex_response_item_prefix: Option<String>,
session_kind: RealtimeSessionKind,
) -> Self {
Self {
output_tx,
active_handoff: Arc::new(Mutex::new(None)),
last_output_text: Arc::new(Mutex::new(None)),
codex_responses_as_items,
codex_response_item_prefix,
session_kind,
}
}
@@ -236,6 +240,8 @@ struct RealtimeStart {
api_provider: ApiProvider,
architecture: RealtimeConversationArchitecture,
extra_headers: Option<HeaderMap>,
codex_responses_as_items: bool,
codex_response_item_prefix: Option<String>,
realtime_call_api_provider: Option<ApiProvider>,
session_config: RealtimeSessionConfig,
model_client: ModelClient,
@@ -290,6 +296,8 @@ impl RealtimeConversationManager {
api_provider,
architecture,
extra_headers,
codex_responses_as_items,
codex_response_item_prefix,
realtime_call_api_provider,
session_config,
model_client,
@@ -306,12 +314,17 @@ impl RealtimeConversationManager {
let (text_tx, text_rx) =
async_channel::bounded::<ConversationTextParams>(TEXT_IN_QUEUE_CAPACITY);
let (handoff_output_tx, handoff_output_rx) =
async_channel::bounded::<HandoffOutput>(HANDOFF_OUT_QUEUE_CAPACITY);
async_channel::bounded::<RealtimeOutbound>(HANDOFF_OUT_QUEUE_CAPACITY);
let (events_tx, events_rx) =
async_channel::bounded::<RealtimeEvent>(OUTPUT_EVENTS_QUEUE_CAPACITY);
let realtime_active = Arc::new(AtomicBool::new(true));
let handoff = RealtimeHandoffState::new(handoff_output_tx, session_kind);
let handoff = RealtimeHandoffState::new(
handoff_output_tx,
codex_responses_as_items,
codex_response_item_prefix,
session_kind,
);
let input_channels = RealtimeInputChannels {
text_rx,
handoff_output_rx,
@@ -480,29 +493,34 @@ impl RealtimeConversationManager {
let active_handoff = handoff.active_handoff.lock().await.clone();
let output = match active_handoff {
Some(handoff_id) => {
let output_text = prefix_realtime_text(
output_text,
REALTIME_BACKEND_TEXT_PREFIX,
handoff.session_kind,
);
let output_text = realtime_backend_output(output_text, handoff.session_kind);
*handoff.last_output_text.lock().await = Some(output_text.clone());
HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
if handoff.codex_responses_as_items {
RealtimeOutbound::ConversationItem {
text: realtime_backend_item(
output_text,
handoff.codex_response_item_prefix.as_deref(),
),
}
} else {
RealtimeOutbound::HandoffUpdate {
handoff_id,
text: output_text,
}
}
}
None if output_text.trim().is_empty() => return Ok(()),
None => {
let output_text = prefix_realtime_text(
output_text,
REALTIME_BACKEND_TEXT_PREFIX,
handoff.session_kind,
);
HandoffOutput::StandaloneAssistantOutput {
output_text: truncate_realtime_text_to_token_budget(
&output_text,
REALTIME_ASSISTANT_OUTPUT_TOKEN_BUDGET,
),
let output_text = realtime_backend_output(output_text, handoff.session_kind);
if handoff.codex_responses_as_items {
RealtimeOutbound::ConversationItem {
text: realtime_backend_item(
output_text,
handoff.codex_response_item_prefix.as_deref(),
),
}
} else {
RealtimeOutbound::StandaloneHandoff { text: output_text }
}
}
};
@@ -514,6 +532,31 @@ impl RealtimeConversationManager {
Ok(())
}
pub(crate) async fn append_speech(&self, text: String) -> CodexResult<()> {
if text.trim().is_empty() {
return Ok(());
}
let handoff = {
let guard = self.state.lock().await;
let Some(state) = guard.as_ref() else {
return Err(CodexErr::InvalidRequest(
"conversation is not running".to_string(),
));
};
state.handoff.clone()
};
handoff
.output_tx
.send(RealtimeOutbound::StandaloneHandoff {
text: realtime_backend_output(text, handoff.session_kind),
})
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
Ok(())
}
pub(crate) async fn handoff_complete(&self) -> CodexResult<()> {
let handoff = {
let guard = self.state.lock().await;
@@ -534,12 +577,18 @@ impl RealtimeConversationManager {
return Ok(());
};
let output = if handoff.codex_responses_as_items {
RealtimeOutbound::HandoffCompleteAck { handoff_id }
} else {
RealtimeOutbound::CompletedHandoff {
handoff_id,
text: output_text,
}
};
handoff
.output_tx
.send(HandoffOutput::FinalUpdate {
handoff_id,
output_text,
})
.send(output)
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))
}
@@ -626,6 +675,8 @@ struct PreparedRealtimeConversationStart {
api_provider: ApiProvider,
architecture: RealtimeConversationArchitecture,
extra_headers: Option<HeaderMap>,
codex_responses_as_items: bool,
codex_response_item_prefix: Option<String>,
realtime_call_api_provider: Option<ApiProvider>,
requested_realtime_session_id: Option<String>,
version: RealtimeWsVersion,
@@ -701,6 +752,8 @@ async fn prepare_realtime_start(
api_provider,
architecture,
extra_headers,
codex_responses_as_items: params.codex_responses_as_items,
codex_response_item_prefix: params.codex_response_item_prefix,
realtime_call_api_provider,
requested_realtime_session_id,
version,
@@ -812,6 +865,19 @@ fn prefix_realtime_text(text: String, prefix: &str, session_kind: RealtimeSessio
format!("{prefix}{text}")
}
fn realtime_backend_output(output_text: String, session_kind: RealtimeSessionKind) -> String {
let output_text = prefix_realtime_text(output_text, REALTIME_BACKEND_TEXT_PREFIX, session_kind);
truncate_realtime_text_to_token_budget(&output_text, REALTIME_ASSISTANT_OUTPUT_TOKEN_BUDGET)
}
fn realtime_backend_item(text: String, prefix: Option<&str>) -> String {
let text = match prefix.filter(|prefix| !prefix.is_empty()) {
Some(prefix) => format!("{prefix}\n\n{text}"),
None => text,
};
truncate_realtime_text_to_token_budget(&text, REALTIME_ASSISTANT_OUTPUT_TOKEN_BUDGET)
}
fn validate_realtime_voice(version: RealtimeWsVersion, voice: RealtimeVoice) -> CodexResult<()> {
let voices = RealtimeVoicesList::builtin();
let allowed = match version {
@@ -846,6 +912,8 @@ async fn handle_start_inner(
api_provider,
architecture,
extra_headers,
codex_responses_as_items,
codex_response_item_prefix,
realtime_call_api_provider,
requested_realtime_session_id,
version,
@@ -861,6 +929,8 @@ async fn handle_start_inner(
api_provider,
architecture,
extra_headers,
codex_responses_as_items,
codex_response_item_prefix,
realtime_call_api_provider,
session_config,
model_client: sess.services.model_client.clone(),
@@ -1089,6 +1159,23 @@ pub(crate) async fn handle_text(
}
}
pub(crate) async fn handle_speech(
sess: &Arc<Session>,
sub_id: String,
params: ConversationSpeechParams,
) {
debug!(text = %params.text, "[realtime-text] appending realtime speech");
if let Err(err) = sess.conversation.append_speech(params.text).await {
error!("failed to append realtime speech: {err}");
if sess.conversation.running_state().await.is_some() {
warn!("realtime speech append failed while the session was already ending");
} else {
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest)
.await;
}
}
}
pub(crate) async fn handle_close(sess: &Arc<Session>, sub_id: String) {
end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Requested).await;
}
@@ -1256,7 +1343,7 @@ async fn handle_text_input(
}
async fn handle_handoff_output(
handoff_output: Result<HandoffOutput, RecvError>,
handoff_output: Result<RealtimeOutbound, RecvError>,
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
handoff_state: &RealtimeHandoffState,
@@ -1267,45 +1354,39 @@ async fn handle_handoff_output(
let result = match event_parser {
RealtimeEventParser::V1 => match handoff_output {
HandoffOutput::StandaloneAssistantOutput { output_text } => {
RealtimeOutbound::StandaloneHandoff { text } => {
// TODO(guinness): Use the new client event for standalone handoffs once the API changes are complete.
writer
.send_conversation_handoff_append(
STANDALONE_HANDOFF_ID.to_string(),
output_text,
)
.send_conversation_handoff_append(STANDALONE_HANDOFF_ID.to_string(), text)
.await
}
HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
}
| HandoffOutput::FinalUpdate {
handoff_id,
output_text,
} => {
RealtimeOutbound::HandoffUpdate { handoff_id, text }
| RealtimeOutbound::CompletedHandoff { handoff_id, text } => {
writer
.send_conversation_function_call_output(handoff_id, output_text)
.send_conversation_function_call_output(handoff_id, text)
.await
}
RealtimeOutbound::ConversationItem { text } => {
writer
.send_conversation_item_create(text, ConversationTextRole::Developer)
.await
}
RealtimeOutbound::HandoffCompleteAck { .. } => Ok(()),
},
RealtimeEventParser::RealtimeV2 => match handoff_output {
HandoffOutput::StandaloneAssistantOutput { output_text } => {
RealtimeOutbound::StandaloneHandoff { text } => {
if let Err(err) = writer
.send_conversation_item_create(output_text, ConversationTextRole::User)
.send_conversation_item_create(text, ConversationTextRole::User)
.await
{
Err(err)
} else {
return response_create_queue
.request_create(writer, events_tx, "standalone assistant output")
.request_create(writer, events_tx, "standalone handoff")
.await;
}
}
HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
} => {
RealtimeOutbound::HandoffUpdate { handoff_id, text } => {
let active_handoff = handoff_state.active_handoff.lock().await.clone();
match active_handoff {
Some(active_handoff) if active_handoff == handoff_id => {}
@@ -1315,12 +1396,12 @@ async fn handle_handoff_output(
}
}
writer
.send_conversation_item_create(output_text, ConversationTextRole::User)
.send_conversation_item_create(text, ConversationTextRole::User)
.await
}
HandoffOutput::FinalUpdate {
RealtimeOutbound::CompletedHandoff {
handoff_id,
output_text: _,
text: _,
} => {
if let Err(err) = writer
.send_conversation_function_call_output(
@@ -1336,6 +1417,16 @@ async fn handle_handoff_output(
.await;
}
}
RealtimeOutbound::ConversationItem { text } => {
writer
.send_conversation_item_create(text, ConversationTextRole::Developer)
.await
}
RealtimeOutbound::HandoffCompleteAck { handoff_id } => {
writer
.send_conversation_function_call_output(handoff_id, String::new())
.await
}
},
};
if let Err(err) = result {
@@ -1449,7 +1540,6 @@ async fn handle_realtime_server_event(
match session_kind {
RealtimeSessionKind::V1 => {
*handoff_state.last_output_text.lock().await = None;
*handoff_state.active_handoff.lock().await = Some(handoff.handoff_id.clone());
}
RealtimeSessionKind::V2 => {
@@ -1477,7 +1567,6 @@ async fn handle_realtime_server_event(
.await?;
}
None => {
*handoff_state.last_output_text.lock().await = None;
*handoff_state.active_handoff.lock().await =
Some(handoff.handoff_id.clone());
}
@@ -128,7 +128,12 @@ fn wraps_realtime_delegation_input_with_xml_escaping_without_transcript() {
#[tokio::test]
async fn clears_active_handoff_explicitly() {
let (tx, _rx) = bounded(1);
let state = RealtimeHandoffState::new(tx, RealtimeSessionKind::V1);
let state = RealtimeHandoffState::new(
tx,
/*codex_responses_as_items*/ false,
/*codex_response_item_prefix*/ None,
RealtimeSessionKind::V1,
);
*state.active_handoff.lock().await = Some("handoff_1".to_string());
assert_eq!(
+5
View File
@@ -1,5 +1,6 @@
use crate::realtime_conversation::handle_audio as handle_realtime_conversation_audio;
use crate::realtime_conversation::handle_close as handle_realtime_conversation_close;
use crate::realtime_conversation::handle_speech as handle_realtime_conversation_speech;
use crate::realtime_conversation::handle_start as handle_realtime_conversation_start;
use crate::realtime_conversation::handle_text as handle_realtime_conversation_text;
use async_channel::Receiver;
@@ -737,6 +738,10 @@ pub(super) async fn submission_loop(
handle_realtime_conversation_text(&sess, sub.id.clone(), params).await;
false
}
Op::RealtimeConversationSpeech(params) => {
handle_realtime_conversation_speech(&sess, sub.id.clone(), params).await;
false
}
Op::RealtimeConversationClose => {
handle_realtime_conversation_close(&sess, sub.id.clone()).await;
false
@@ -206,6 +206,8 @@ async fn start_realtime_conversation(codex: &codex_core::CodexThread) -> Result<
codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -285,6 +285,8 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -427,6 +429,8 @@ async fn conversation_start_defaults_to_v2_and_gpt_realtime_1_5() -> Result<()>
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -518,6 +522,8 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: Some("session-override-model".to_string()),
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -698,6 +704,8 @@ async fn conversation_webrtc_start_uses_avas_architecture_query() -> Result<()>
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: Some(RealtimeConversationArchitecture::Avas),
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -796,6 +804,8 @@ async fn conversation_webrtc_start_uses_configured_call_base_url_for_avas() -> R
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: Some(RealtimeConversationArchitecture::Avas),
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -886,6 +896,8 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join(
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -973,6 +985,8 @@ async fn conversation_webrtc_sideband_connect_failure_closes_with_error() -> Res
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -1062,6 +1076,8 @@ async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() ->
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -1131,6 +1147,8 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -1224,6 +1242,8 @@ async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Res
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -1271,6 +1291,8 @@ async fn conversation_start_connect_failure_emits_realtime_error_only() -> Resul
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -1366,6 +1388,8 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("old".to_string())),
@@ -1392,6 +1416,8 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("new".to_string())),
@@ -1489,6 +1515,8 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -1554,6 +1582,8 @@ async fn conversation_uses_default_realtime_backend_prompt() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: None,
@@ -1627,6 +1657,8 @@ async fn conversation_uses_empty_instructions_for_null_or_empty_prompt() -> Resu
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt,
@@ -1693,6 +1725,8 @@ async fn conversation_uses_explicit_start_voice() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -1751,6 +1785,8 @@ async fn conversation_uses_configured_realtime_voice() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -1797,6 +1833,8 @@ async fn conversation_rejects_voice_for_wrong_realtime_version() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -1844,6 +1882,8 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("prompt from op".to_string())),
@@ -1917,6 +1957,8 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("prompt from op".to_string())),
@@ -1984,6 +2026,8 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("prompt from op".to_string())),
@@ -2044,6 +2088,8 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -2158,6 +2204,8 @@ async fn conversation_startup_context_current_thread_selects_many_turns_by_budge
codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -2266,6 +2314,8 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -2326,6 +2376,8 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -2407,6 +2459,8 @@ async fn conversation_user_text_turn_is_not_sent_to_realtime() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -2504,6 +2558,8 @@ async fn realtime_v2_noop_tool_call_returns_empty_function_output_without_respon
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -2603,6 +2659,8 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Re
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -2740,6 +2798,8 @@ async fn conversation_handoff_persists_across_item_done_until_turn_complete() ->
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -2892,6 +2952,8 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -2994,6 +3056,8 @@ async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -3097,6 +3161,8 @@ async fn inbound_handoff_request_sends_transcript_delta_after_each_handoff() ->
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -3198,6 +3264,8 @@ async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio(
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -3321,6 +3389,8 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -3474,6 +3544,8 @@ async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> R
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -3616,6 +3688,8 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
@@ -3769,6 +3843,8 @@ async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio()
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
architecture: None,
codex_responses_as_items: false,
codex_response_item_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
prompt: Some(Some("backend prompt".to_string())),
+13
View File
@@ -182,6 +182,10 @@ pub struct McpServerRefreshConfig {
pub struct ConversationStartParams {
/// Overrides the configured realtime architecture for this session only.
pub architecture: Option<RealtimeConversationArchitecture>,
/// Sends automatic Codex responses as realtime conversation items instead of handoff appends.
pub codex_responses_as_items: bool,
/// Optional prefix added to automatic Codex response items when `codex_responses_as_items` is set.
pub codex_response_item_prefix: Option<String>,
/// Overrides the configured realtime model for this session only.
pub model: Option<String>,
/// Selects whether the realtime session should produce text or audio output.
@@ -407,6 +411,11 @@ pub enum ConversationTextRole {
Developer,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ConversationSpeechParams {
pub text: String,
}
/// Persistent thread-settings overrides that can be applied before user input or
/// on their own.
#[derive(Debug, Clone, Default, PartialEq)]
@@ -503,6 +512,9 @@ pub enum Op {
/// Send text input to the running realtime conversation stream.
RealtimeConversationText(ConversationTextParams),
/// Append speakable text to the running realtime conversation stream.
RealtimeConversationSpeech(ConversationSpeechParams),
/// Close the running realtime conversation stream.
RealtimeConversationClose,
@@ -762,6 +774,7 @@ impl Op {
Self::RealtimeConversationStart(_) => "realtime_conversation_start",
Self::RealtimeConversationAudio(_) => "realtime_conversation_audio",
Self::RealtimeConversationText(_) => "realtime_conversation_text",
Self::RealtimeConversationSpeech(_) => "realtime_conversation_speech",
Self::RealtimeConversationClose => "realtime_conversation_close",
Self::RealtimeConversationListVoices => "realtime_conversation_list_voices",
Self::UserInput { .. } => "user_input",