From 216dee1189fd589ea6c0741a5f92f578a3ca4640 Mon Sep 17 00:00:00 2001 From: Alex Gamble Date: Fri, 12 Jun 2026 15:05:37 -0700 Subject: [PATCH] [codex] add roles to realtime append text (#27936) ## Summary Add an explicit `user` or `developer` role to `thread/realtime/appendText` and propagate it through the realtime input queue into `conversation.item.create`. Older JSON clients that omit the field continue to default to `user`. This lets app-provided context such as memory retain developer authority without bypassing app-server through a renderer-owned data channel. The app-server schemas, API documentation, and focused protocol and websocket coverage are updated with the new contract. The Codex Apps consumer is tracked in [openai/openai#1025261](https://github.com/openai/openai/pull/1025261). --- .../schema/json/ClientRequest.json | 7 ++ .../codex_app_server_protocol.schemas.json | 7 ++ .../codex_app_server_protocol.v2.schemas.json | 7 ++ .../schema/typescript/ConversationTextRole.ts | 5 ++ .../schema/typescript/index.ts | 1 + .../src/protocol/v2/realtime.rs | 3 + .../src/protocol/v2/tests.rs | 19 ++++++ codex-rs/app-server/README.md | 2 +- .../src/request_processors/turn_processor.rs | 5 +- .../tests/suite/v2/realtime_conversation.rs | 22 +++++++ .../endpoint/realtime_websocket/methods.rs | 35 ++++++++-- .../realtime_websocket/methods_common.rs | 6 +- .../endpoint/realtime_websocket/methods_v1.rs | 9 ++- .../endpoint/realtime_websocket/methods_v2.rs | 9 ++- .../endpoint/realtime_websocket/protocol.rs | 9 +-- codex-rs/core/src/realtime_conversation.rs | 64 +++++++++++-------- .../core/tests/suite/realtime_conversation.rs | 6 ++ codex-rs/protocol/src/protocol.rs | 10 +++ 18 files changed, 176 insertions(+), 50 deletions(-) create mode 100644 codex-rs/app-server-protocol/schema/typescript/ConversationTextRole.ts diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 0829bcb1a..5e5924fc1 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -628,6 +628,13 @@ } ] }, + "ConversationTextRole": { + "enum": [ + "user", + "developer" + ], + "type": "string" + }, "DynamicToolSpec": { "properties": { "deferLoading": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 41040e9fd..e2c131446 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -8539,6 +8539,13 @@ "title": "ContextCompactedNotification", "type": "object" }, + "ConversationTextRole": { + "enum": [ + "user", + "developer" + ], + "type": "string" + }, "CreditsSnapshot": { "properties": { "balance": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index da1370be3..fc91ed3f7 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -4852,6 +4852,13 @@ "title": "ContextCompactedNotification", "type": "object" }, + "ConversationTextRole": { + "enum": [ + "user", + "developer" + ], + "type": "string" + }, "CreditsSnapshot": { "properties": { "balance": { diff --git a/codex-rs/app-server-protocol/schema/typescript/ConversationTextRole.ts b/codex-rs/app-server-protocol/schema/typescript/ConversationTextRole.ts new file mode 100644 index 000000000..9cba89f83 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/ConversationTextRole.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ConversationTextRole = "user" | "developer"; diff --git a/codex-rs/app-server-protocol/schema/typescript/index.ts b/codex-rs/app-server-protocol/schema/typescript/index.ts index 7aa64ab00..ad6376fa5 100644 --- a/codex-rs/app-server-protocol/schema/typescript/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/index.ts @@ -14,6 +14,7 @@ export type { CollaborationMode } from "./CollaborationMode"; export type { ContentItem } from "./ContentItem"; export type { ConversationGitInfo } from "./ConversationGitInfo"; export type { ConversationSummary } from "./ConversationSummary"; +export type { ConversationTextRole } from "./ConversationTextRole"; export type { ExecCommandApprovalParams } from "./ExecCommandApprovalParams"; export type { ExecCommandApprovalResponse } from "./ExecCommandApprovalResponse"; export type { ExecPolicyAmendment } from "./ExecPolicyAmendment"; diff --git a/codex-rs/app-server-protocol/src/protocol/v2/realtime.rs b/codex-rs/app-server-protocol/src/protocol/v2/realtime.rs index faeef68fc..bde44f9a0 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/realtime.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/realtime.rs @@ -1,3 +1,4 @@ +use codex_protocol::protocol::ConversationTextRole; use codex_protocol::protocol::RealtimeAudioFrame as CoreRealtimeAudioFrame; use codex_protocol::protocol::RealtimeConversationArchitecture; use codex_protocol::protocol::RealtimeConversationVersion; @@ -135,6 +136,8 @@ pub struct ThreadRealtimeAppendAudioResponse {} pub struct ThreadRealtimeAppendTextParams { pub thread_id: String, pub text: String, + #[serde(default)] + pub role: ConversationTextRole, } /// EXPERIMENTAL - response for appending realtime text input. diff --git a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs index c570b87ce..c919d957b 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/tests.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/tests.rs @@ -27,6 +27,7 @@ use codex_protocol::permissions::FileSystemSandboxEntry as CoreFileSystemSandbox use codex_protocol::permissions::FileSystemSpecialPath as CoreFileSystemSpecialPath; use codex_protocol::protocol::AgentStatus as CoreAgentStatus; use codex_protocol::protocol::AskForApproval as CoreAskForApproval; +use codex_protocol::protocol::ConversationTextRole; use codex_protocol::protocol::GranularApprovalConfig as CoreGranularApprovalConfig; use codex_protocol::protocol::NetworkAccess as CoreNetworkAccess; use codex_protocol::request_permissions::RequestPermissionProfile as CoreRequestPermissionProfile; @@ -3862,3 +3863,21 @@ fn turn_start_params_reject_relative_environment_cwd() { "unexpected error: {err}" ); } + +#[test] +fn realtime_append_text_defaults_role_to_user() { + let params = serde_json::from_value::(json!({ + "threadId": "thread_123", + "text": "hello", + })) + .expect("params should deserialize"); + + assert_eq!( + params, + ThreadRealtimeAppendTextParams { + thread_id: "thread_123".to_string(), + text: "hello".to_string(), + role: ConversationTextRole::User, + } + ); +} diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 956bbb49c..206cb4803 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -167,7 +167,7 @@ Example with notification opt-out: - `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`. - `thread/realtime/start` — start a thread-scoped realtime session (experimental); pass `outputModality: "text"` or `outputModality: "audio"` to choose model output, and optionally pass `model` and `version` to override configured realtime selection for this session only. Returns `{}` and streams `thread/realtime/*` notifications. Omit `transport` for the websocket transport, or pass `{ "type": "webrtc", "sdp": "..." }` to create a WebRTC session from a browser-generated SDP offer; the remote answer SDP is emitted as `thread/realtime/sdp`. - `thread/realtime/appendAudio` — append an input audio chunk to the active realtime session (experimental); returns `{}`. -- `thread/realtime/appendText` — append text input to the active realtime session (experimental); returns `{}`. +- `thread/realtime/appendText` — append text input to the active realtime session with a required `role` of `user` or `developer` (experimental); returns `{}`. Older clients that omit `role` default to `user`. - `thread/realtime/stop` — stop the active realtime session for the thread (experimental); returns `{}`. - `review/start` — kick off Codex’s automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review. - `command/exec` — run a single command under the server sandbox without starting a thread/turn (handy for utilities and validation). diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index f25c589e5..b9d6656cc 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -997,7 +997,10 @@ impl TurnRequestProcessor { self.submit_core_op( request_id, thread.as_ref(), - Op::RealtimeConversationText(ConversationTextParams { text: params.text }), + Op::RealtimeConversationText(ConversationTextParams { + text: params.text, + role: params.role, + }), ) .await .map_err(|err| { diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 50ed876ec..98549ef36 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -42,6 +42,7 @@ use codex_app_server_protocol::TurnStartedNotification; use codex_app_server_protocol::UserInput as V2UserInput; use codex_features::FEATURES; use codex_features::Feature; +use codex_protocol::protocol::ConversationTextRole; use codex_protocol::protocol::RealtimeConversationVersion; use codex_protocol::protocol::RealtimeOutputModality; use codex_protocol::protocol::RealtimeVoice; @@ -393,6 +394,7 @@ impl RealtimeE2eHarness { .send_thread_realtime_append_text_request(ThreadRealtimeAppendTextParams { thread_id, text: text.to_string(), + role: ConversationTextRole::User, }) .await?; let response: JSONRPCResponse = timeout( @@ -636,6 +638,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { .send_thread_realtime_append_text_request(ThreadRealtimeAppendTextParams { thread_id: started.thread_id.clone(), text: "hello".to_string(), + role: ConversationTextRole::Developer, }) .await?; let text_append_response: JSONRPCResponse = timeout( @@ -735,6 +738,25 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { connection[0].body_json()["session"]["instructions"].as_str(), Some(startup_context_instructions.as_str()), ); + let text_request = connection + .iter() + .map(WebSocketRequest::body_json) + .find(|request| request["type"] == "conversation.item.create") + .context("expected conversation item request")?; + assert_eq!( + text_request, + json!({ + "type": "conversation.item.create", + "item": { + "type": "message", + "role": "developer", + "content": [{ + "type": "input_text", + "text": "hello", + }], + }, + }) + ); let mut request_types = [ connection[1].body_json()["type"] .as_str() diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index d9a67c7b9..0aa2feb04 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -17,6 +17,7 @@ use crate::error::ApiError; use crate::provider::Provider; use codex_client::backoff; use codex_client::maybe_build_rustls_client_config_with_custom_ca; +use codex_protocol::protocol::ConversationTextRole; use codex_protocol::protocol::RealtimeTranscriptDelta; use codex_utils_rustls_provider::ensure_rustls_crypto_provider; use futures::SinkExt; @@ -227,8 +228,12 @@ impl RealtimeWebsocketConnection { self.writer.send_audio_frame(frame).await } - pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> { - self.writer.send_conversation_item_create(text).await + pub async fn send_conversation_item_create( + &self, + text: String, + role: ConversationTextRole, + ) -> Result<(), ApiError> { + self.writer.send_conversation_item_create(text, role).await } pub async fn send_conversation_function_call_output( @@ -286,9 +291,17 @@ impl RealtimeWebsocketWriter { .await } - pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> { - self.send_json(&conversation_item_create_message(self.event_parser, text)) - .await + pub async fn send_conversation_item_create( + &self, + text: String, + role: ConversationTextRole, + ) -> Result<(), ApiError> { + self.send_json(&conversation_item_create_message( + self.event_parser, + text, + role, + )) + .await } pub async fn send_conversation_handoff_append( @@ -1609,6 +1622,7 @@ mod tests { .expect("text"); let third_json: Value = serde_json::from_str(&third).expect("json"); assert_eq!(third_json["type"], "conversation.item.create"); + assert_eq!(third_json["item"]["role"], "developer"); assert_eq!( third_json["item"]["content"][0]["type"], Value::String("input_text".to_string()) @@ -1746,7 +1760,10 @@ mod tests { .await .expect("send audio"); connection - .send_conversation_item_create("hello agent".to_string()) + .send_conversation_item_create( + "hello agent".to_string(), + ConversationTextRole::Developer, + ) .await .expect("send item"); connection @@ -1948,6 +1965,7 @@ mod tests { .expect("text"); let second_json: Value = serde_json::from_str(&second).expect("json"); assert_eq!(second_json["type"], "conversation.item.create"); + assert_eq!(second_json["item"]["role"], "developer"); assert_eq!( second_json["item"]["type"], Value::String("message".to_string()) @@ -2030,7 +2048,10 @@ mod tests { ); connection - .send_conversation_item_create("delegate this".to_string()) + .send_conversation_item_create( + "delegate this".to_string(), + ConversationTextRole::Developer, + ) .await .expect("send text item"); connection diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs index 1e47fb6fb..131cf27a9 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_common.rs @@ -13,6 +13,7 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig; use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode; use crate::endpoint::realtime_websocket::protocol::RealtimeVoice; use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; +use codex_protocol::protocol::ConversationTextRole; use serde_json::Result as JsonResult; use serde_json::Value; use serde_json::to_value; @@ -33,10 +34,11 @@ pub(super) fn normalized_session_mode( pub(super) fn conversation_item_create_message( event_parser: RealtimeEventParser, text: String, + role: ConversationTextRole, ) -> RealtimeOutboundMessage { match event_parser { - RealtimeEventParser::V1 => v1_conversation_item_create_message(text), - RealtimeEventParser::RealtimeV2 => v2_conversation_item_create_message(text), + RealtimeEventParser::V1 => v1_conversation_item_create_message(text, role), + RealtimeEventParser::RealtimeV2 => v2_conversation_item_create_message(text, role), } } diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs index 0f1a26908..a7f73d82f 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v1.rs @@ -5,7 +5,6 @@ use crate::endpoint::realtime_websocket::protocol::ConversationItemContent; use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload; use crate::endpoint::realtime_websocket::protocol::ConversationItemType; use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem; -use crate::endpoint::realtime_websocket::protocol::ConversationRole; use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage; use crate::endpoint::realtime_websocket::protocol::RealtimeVoice; use crate::endpoint::realtime_websocket::protocol::SessionAudio; @@ -14,12 +13,16 @@ use crate::endpoint::realtime_websocket::protocol::SessionAudioInput; use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput; use crate::endpoint::realtime_websocket::protocol::SessionType; use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; +use codex_protocol::protocol::ConversationTextRole; -pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage { +pub(super) fn conversation_item_create_message( + text: String, + role: ConversationTextRole, +) -> RealtimeOutboundMessage { RealtimeOutboundMessage::ConversationItemCreate { item: ConversationItemPayload::Message(ConversationMessageItem { r#type: ConversationItemType::Message, - role: ConversationRole::User, + role, content: vec![ConversationItemContent { r#type: ConversationContentType::InputText, text, diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs index 292067748..702b4be86 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods_v2.rs @@ -6,7 +6,6 @@ use crate::endpoint::realtime_websocket::protocol::ConversationItemContent; use crate::endpoint::realtime_websocket::protocol::ConversationItemPayload; use crate::endpoint::realtime_websocket::protocol::ConversationItemType; use crate::endpoint::realtime_websocket::protocol::ConversationMessageItem; -use crate::endpoint::realtime_websocket::protocol::ConversationRole; use crate::endpoint::realtime_websocket::protocol::NoiseReductionType; use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage; use crate::endpoint::realtime_websocket::protocol::RealtimeOutputModality; @@ -25,6 +24,7 @@ use crate::endpoint::realtime_websocket::protocol::SessionTurnDetection; use crate::endpoint::realtime_websocket::protocol::SessionType; use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; use crate::endpoint::realtime_websocket::protocol::TurnDetectionType; +use codex_protocol::protocol::ConversationTextRole; use serde_json::json; const REALTIME_V2_OUTPUT_MODALITY_AUDIO: &str = "audio"; @@ -36,11 +36,14 @@ const REALTIME_V2_SILENCE_TOOL_NAME: &str = "remain_silent"; const REALTIME_V2_SILENCE_TOOL_DESCRIPTION: &str = "Call this when the best response is to say nothing. Use it instead of speaking after hidden system/control messages, after background agent updates in silent modes, or whenever acknowledging aloud would be distracting. This tool has no user-visible effect."; const REALTIME_V2_INPUT_TRANSCRIPTION_MODEL: &str = "gpt-4o-mini-transcribe"; -pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage { +pub(super) fn conversation_item_create_message( + text: String, + role: ConversationTextRole, +) -> RealtimeOutboundMessage { RealtimeOutboundMessage::ConversationItemCreate { item: ConversationItemPayload::Message(ConversationMessageItem { r#type: ConversationItemType::Message, - role: ConversationRole::User, + role, content: vec![ConversationItemContent { r#type: ConversationContentType::InputText, text, diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs index 5df4c0c50..8f61fc1d1 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs @@ -1,5 +1,6 @@ use crate::endpoint::realtime_websocket::protocol_v1::parse_realtime_event_v1; use crate::endpoint::realtime_websocket::protocol_v2::parse_realtime_event_v2; +use codex_protocol::protocol::ConversationTextRole; pub use codex_protocol::protocol::RealtimeAudioFrame; pub use codex_protocol::protocol::RealtimeEvent; pub use codex_protocol::protocol::RealtimeOutputModality; @@ -157,7 +158,7 @@ pub(super) struct SessionAudioOutputFormat { pub(super) struct ConversationMessageItem { #[serde(rename = "type")] pub(super) r#type: ConversationItemType, - pub(super) role: ConversationRole, + pub(super) role: ConversationTextRole, pub(super) content: Vec, } @@ -168,12 +169,6 @@ pub(super) enum ConversationItemType { FunctionCallOutput, } -#[derive(Debug, Clone, Copy, Serialize)] -#[serde(rename_all = "snake_case")] -pub(super) enum ConversationRole { - User, -} - #[derive(Debug, Clone, Serialize)] #[serde(untagged)] pub(super) enum ConversationItemPayload { diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index c14bf0af5..a1a861c87 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -35,6 +35,7 @@ use codex_protocol::protocol::ConversationAudioParams; use codex_protocol::protocol::ConversationStartParams; use codex_protocol::protocol::ConversationStartTransport; use codex_protocol::protocol::ConversationTextParams; +use codex_protocol::protocol::ConversationTextRole; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; @@ -62,7 +63,7 @@ use tracing::info; use tracing::warn; const AUDIO_IN_QUEUE_CAPACITY: usize = 256; -const USER_TEXT_IN_QUEUE_CAPACITY: usize = 64; +const TEXT_IN_QUEUE_CAPACITY: usize = 64; const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64; const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256; const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_300; @@ -194,7 +195,7 @@ impl RealtimeResponseCreateQueue { struct RealtimeInputTask { writer: RealtimeWebsocketWriter, events: RealtimeWebsocketEvents, - user_text_rx: Receiver, + text_rx: Receiver, handoff_output_rx: Receiver, audio_rx: Receiver, events_tx: Sender, @@ -204,7 +205,7 @@ struct RealtimeInputTask { } struct RealtimeInputChannels { - user_text_rx: Receiver, + text_rx: Receiver, handoff_output_rx: Receiver, audio_rx: Receiver, } @@ -223,7 +224,7 @@ impl RealtimeHandoffState { #[allow(dead_code)] struct ConversationState { audio_tx: Sender, - user_text_tx: Sender, + text_tx: Sender, session_kind: RealtimeSessionKind, handoff: RealtimeHandoffState, input_task: JoinHandle<()>, @@ -302,8 +303,8 @@ impl RealtimeConversationManager { let (audio_tx, audio_rx) = async_channel::bounded::(AUDIO_IN_QUEUE_CAPACITY); - let (user_text_tx, user_text_rx) = - async_channel::bounded::(USER_TEXT_IN_QUEUE_CAPACITY); + let (text_tx, text_rx) = + async_channel::bounded::(TEXT_IN_QUEUE_CAPACITY); let (handoff_output_tx, handoff_output_rx) = async_channel::bounded::(HANDOFF_OUT_QUEUE_CAPACITY); let (events_tx, events_rx) = @@ -312,7 +313,7 @@ impl RealtimeConversationManager { let realtime_active = Arc::new(AtomicBool::new(true)); let handoff = RealtimeHandoffState::new(handoff_output_tx, session_kind); let input_channels = RealtimeInputChannels { - user_text_rx, + text_rx, handoff_output_rx, audio_rx, }; @@ -353,7 +354,7 @@ impl RealtimeConversationManager { let task = spawn_realtime_input_task(RealtimeInputTask { writer: connection.writer(), events: connection.events(), - user_text_rx: input_channels.user_text_rx, + text_rx: input_channels.text_rx, handoff_output_rx: input_channels.handoff_output_rx, audio_rx: input_channels.audio_rx, events_tx, @@ -367,7 +368,7 @@ impl RealtimeConversationManager { let mut guard = self.state.lock().await; *guard = Some(ConversationState { audio_tx, - user_text_tx, + text_tx, session_kind, handoff, input_task: task, @@ -440,12 +441,12 @@ impl RealtimeConversationManager { } } - pub(crate) async fn text_in(&self, text: String) -> CodexResult<()> { + pub(crate) async fn text_in(&self, mut params: ConversationTextParams) -> CodexResult<()> { let sender = { let guard = self.state.lock().await; guard .as_ref() - .map(|state| (state.user_text_tx.clone(), state.session_kind)) + .map(|state| (state.text_tx.clone(), state.session_kind)) }; let Some((sender, session_kind)) = sender else { @@ -454,9 +455,12 @@ impl RealtimeConversationManager { )); }; - let text = prefix_realtime_text(text, REALTIME_USER_TEXT_PREFIX, session_kind); + if params.role == ConversationTextRole::User { + params.text = + prefix_realtime_text(params.text, REALTIME_USER_TEXT_PREFIX, session_kind); + } sender - .send(text) + .send(params) .await .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; Ok(()) @@ -1074,7 +1078,7 @@ pub(crate) async fn handle_text( params: ConversationTextParams, ) { debug!(text = %params.text, "[realtime-text] appending realtime conversation text input"); - if let Err(err) = sess.conversation.text_in(params.text).await { + if let Err(err) = sess.conversation.text_in(params).await { error!("failed to append realtime text: {err}"); if sess.conversation.running_state().await.is_some() { warn!("realtime text input failed while the session was already ending"); @@ -1154,7 +1158,7 @@ fn spawn_webrtc_sideband_input_task(input: RealtimeWebrtcSidebandInputTask) -> J run_realtime_input_task(RealtimeInputTask { writer: connection.writer(), events: connection.events(), - user_text_rx: input_channels.user_text_rx, + text_rx: input_channels.text_rx, handoff_output_rx: input_channels.handoff_output_rx, audio_rx: input_channels.audio_rx, events_tx, @@ -1170,7 +1174,7 @@ async fn run_realtime_input_task(input: RealtimeInputTask) { let RealtimeInputTask { writer, events, - user_text_rx, + text_rx, handoff_output_rx, audio_rx, events_tx, @@ -1184,10 +1188,10 @@ async fn run_realtime_input_task(input: RealtimeInputTask) { loop { let result = tokio::select! { - // Text typed by the user that should be sent into realtime. - user_text = user_text_rx.recv() => { - handle_user_text_input( - user_text, + // Text input that should be sent into realtime. + text = text_rx.recv() => { + handle_text_input( + text, &writer, &events_tx, ) @@ -1230,14 +1234,17 @@ async fn run_realtime_input_task(input: RealtimeInputTask) { } } -async fn handle_user_text_input( - text: Result, +async fn handle_text_input( + params: Result, writer: &RealtimeWebsocketWriter, events_tx: &Sender, ) -> anyhow::Result<()> { - let text = text.context("user text input channel closed")?; + let params = params.context("text input channel closed")?; - if let Err(err) = writer.send_conversation_item_create(text).await { + if let Err(err) = writer + .send_conversation_item_create(params.text, params.role) + .await + { let mapped_error = map_api_error(err); warn!("failed to send input text: {mapped_error}"); let _ = events_tx @@ -1284,7 +1291,10 @@ async fn handle_handoff_output( }, RealtimeEventParser::RealtimeV2 => match handoff_output { HandoffOutput::StandaloneAssistantOutput { output_text } => { - if let Err(err) = writer.send_conversation_item_create(output_text).await { + if let Err(err) = writer + .send_conversation_item_create(output_text, ConversationTextRole::User) + .await + { Err(err) } else { return response_create_queue @@ -1304,7 +1314,9 @@ async fn handle_handoff_output( return Ok(()); } } - writer.send_conversation_item_create(output_text).await + writer + .send_conversation_item_create(output_text, ConversationTextRole::User) + .await } HandoffOutput::FinalUpdate { handoff_id, diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 387096944..85bc1827f 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -13,6 +13,7 @@ use codex_protocol::protocol::ConversationAudioParams; use codex_protocol::protocol::ConversationStartParams; use codex_protocol::protocol::ConversationStartTransport; use codex_protocol::protocol::ConversationTextParams; +use codex_protocol::protocol::ConversationTextRole; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::InitialHistory; @@ -331,6 +332,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { test.codex .submit(Op::RealtimeConversationText(ConversationTextParams { text: "hello".to_string(), + role: ConversationTextRole::User, })) .await?; @@ -546,6 +548,7 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> { test.codex .submit(Op::RealtimeConversationText(ConversationTextParams { text: "queued before sideband".to_string(), + role: ConversationTextRole::User, })) .await?; @@ -1015,6 +1018,7 @@ async fn conversation_webrtc_sideband_connect_failure_closes_with_error() -> Res test.codex .submit(Op::RealtimeConversationText(ConversationTextParams { text: "after sideband failure".to_string(), + role: ConversationTextRole::User, })) .await?; let err = wait_for_event_match(&test.codex, |msg| match msg { @@ -1311,6 +1315,7 @@ async fn conversation_text_before_start_emits_error() -> Result<()> { test.codex .submit(Op::RealtimeConversationText(ConversationTextParams { text: "hello".to_string(), + role: ConversationTextRole::User, })) .await?; @@ -2345,6 +2350,7 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() -> test.codex .submit(Op::RealtimeConversationText(ConversationTextParams { text: "hello".to_string(), + role: ConversationTextRole::User, })) .await?; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 27c76f0e9..48eb2352f 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -401,6 +401,16 @@ pub struct ConversationAudioParams { #[derive(Debug, Clone, PartialEq)] pub struct ConversationTextParams { pub text: String, + pub role: ConversationTextRole, +} + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize, Serialize, JsonSchema, TS)] +#[serde(rename_all = "snake_case")] +#[ts(rename_all = "snake_case")] +pub enum ConversationTextRole { + #[default] + User, + Developer, } /// Persistent thread-settings overrides that can be applied before user input or