Stream Realtime V2 background agent progress (#17264)

Stream Realtime V2 background agent updates while the background agent
task is still running, then send the final tool output when it
completes. User input during an active V2 handoff is acknowledged back
to realtime as a steering update.

Stack:
- Depends on #17278 for the background_agent rename.
- Depends on #17280 for the input task handler refactor.

Coverage:
- Adds an app-server integration regression test that verifies V2
progress is sent before the final function-call output.

Validation:
- just fmt
- cargo check -p codex-core
- cargo check -p codex-app-server --tests
- git diff --check

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-10 00:06:00 -07:00
committed by GitHub
Unverified
parent 4e910bf151
commit 1de0085418
2 changed files with 198 additions and 153 deletions
@@ -1222,6 +1222,7 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out
v2_background_agent_tool_call("call_v2", "delegate from v2"),
],
vec![],
vec![],
])]),
)
.await?;
@@ -1249,8 +1250,53 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out
requests[0]
);
let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await;
let progress = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_v2_progress_update(&progress, "delegated from v2");
let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_v2_function_call_output(&tool_output, "call_v2", "delegated from v2");
assert_eq!(
function_call_output_sideband_requests(&harness.realtime_server).len(),
1
);
harness.shutdown().await;
Ok(())
}
#[tokio::test]
async fn webrtc_v2_background_agent_progress_is_sent_before_function_output() -> Result<()> {
skip_if_no_network!(Ok(()));
let mut harness = RealtimeE2eHarness::new(
RealtimeTestVersion::V2,
main_loop_responses(vec![create_final_assistant_message_sse_response(
"progress before final",
)?]),
realtime_sideband(vec![realtime_sideband_connection(vec![
vec![
session_updated("sess_v2_progress_before_final"),
v2_background_agent_tool_call("call_progress_order", "stream progress"),
],
vec![],
vec![],
])]),
)
.await?;
let started = harness.start_webrtc_realtime("v=offer\r\n").await?;
assert_eq!(started.started.version, RealtimeConversationVersion::V2);
let turn_completed = harness
.read_notification::<TurnCompletedNotification>("turn/completed")
.await?;
assert_eq!(turn_completed.thread_id, harness.thread_id);
let progress = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_v2_progress_update(&progress, "progress before final");
let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_v2_function_call_output(&tool_output, "call_progress_order", "progress before final");
harness.shutdown().await;
Ok(())
@@ -1278,6 +1324,7 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
v2_background_agent_tool_call("call_shell", "run shell through delegated turn"),
],
vec![],
vec![],
])]);
let mut harness = RealtimeE2eHarness::new_with_sandbox(
@@ -1329,7 +1376,10 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
requests[1]
);
let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await;
let progress = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_v2_progress_update(&progress, "shell tool finished");
let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_v2_function_call_output(&tool_output, "call_shell", "shell tool finished");
assert_eq!(
function_call_output_sideband_requests(&harness.realtime_server).len(),
@@ -1379,6 +1429,7 @@ async fn webrtc_v2_tool_call_does_not_block_sideband_audio() -> Result<()> {
}),
],
vec![],
vec![],
])]),
)
.await?;
@@ -1405,7 +1456,10 @@ async fn webrtc_v2_tool_call_does_not_block_sideband_audio() -> Result<()> {
.await?;
assert_eq!(turn_completed.thread_id, harness.thread_id);
let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await;
let progress = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_v2_progress_update(&progress, "late delegated result");
let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_v2_function_call_output(&tool_output, "call_audio", "late delegated result");
harness.shutdown().await;
@@ -1643,6 +1697,23 @@ fn assert_v2_function_call_output(request: &Value, call_id: &str, expected_outpu
);
}
fn assert_v2_progress_update(request: &Value, expected_text: &str) {
assert_eq!(
request,
&json!({
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [{
"type": "input_text",
"text": format!("{expected_text}\n\nUpdate from background agent (task hasn't finished yet):")
}]
}
})
);
}
fn assert_v1_session_update(request: &Value) -> Result<()> {
assert_eq!(request["type"].as_str(), Some("session.update"));
assert_eq!(request["session"]["type"].as_str(), Some("quicksilver"));
+124 -150
View File
@@ -47,7 +47,6 @@ use codex_protocol::protocol::RealtimeVoicesList;
use http::HeaderMap;
use http::HeaderValue;
use http::header::AUTHORIZATION;
use serde_json::Value;
use serde_json::json;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@@ -65,8 +64,10 @@ const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64;
const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256;
const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_000;
const DEFAULT_REALTIME_MODEL: &str = "gpt-realtime-1.5";
const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str =
"Conversation already has an active response in progress:";
const REALTIME_V2_PROGRESS_UPDATE_SUFFIX: &str =
"\n\nUpdate from background agent (task hasn't finished yet):";
const REALTIME_V2_STEER_ACKNOWLEDGEMENT: &str =
"This was sent to steer the previous background agent task.";
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum RealtimeConversationEnd {
@@ -100,11 +101,11 @@ struct RealtimeHandoffState {
#[derive(Debug, PartialEq, Eq)]
enum HandoffOutput {
ImmediateAppend {
ProgressUpdate {
handoff_id: String,
output_text: String,
},
FinalToolCall {
FinalUpdate {
handoff_id: String,
output_text: String,
},
@@ -116,11 +117,6 @@ struct OutputAudioState {
audio_end_ms: u32,
}
struct ResponseCreateState {
pending_response_create: bool,
response_in_progress: bool,
}
struct RealtimeInputTask {
writer: RealtimeWebsocketWriter,
events: RealtimeWebsocketEvents,
@@ -130,6 +126,7 @@ struct RealtimeInputTask {
events_tx: Sender<RealtimeEvent>,
handoff_state: RealtimeHandoffState,
session_kind: RealtimeSessionKind,
event_parser: RealtimeEventParser,
}
impl RealtimeHandoffState {
@@ -203,7 +200,8 @@ impl RealtimeConversationManager {
model_client,
sdp,
} = start;
let session_kind = match session_config.event_parser {
let event_parser = session_config.event_parser;
let session_kind = match event_parser {
RealtimeEventParser::V1 => RealtimeSessionKind::V1,
RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2,
};
@@ -261,6 +259,7 @@ impl RealtimeConversationManager {
events_tx,
handoff_state: handoff.clone(),
session_kind,
event_parser,
});
let mut guard = self.state.lock().await;
@@ -374,21 +373,14 @@ impl RealtimeConversationManager {
};
*handoff.last_output_text.lock().await = Some(output_text.clone());
match handoff.session_kind {
RealtimeSessionKind::V1 => {
handoff
.output_tx
.send(HandoffOutput::ImmediateAppend {
handoff_id,
output_text,
})
.await
.map_err(|_| {
CodexErr::InvalidRequest("conversation is not running".to_string())
})?;
}
RealtimeSessionKind::V2 => {}
}
handoff
.output_tx
.send(HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
})
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
Ok(())
}
@@ -414,7 +406,7 @@ impl RealtimeConversationManager {
handoff
.output_tx
.send(HandoffOutput::FinalToolCall {
.send(HandoffOutput::FinalUpdate {
handoff_id,
output_text,
})
@@ -874,26 +866,17 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
events_tx,
handoff_state,
session_kind,
event_parser,
} = input;
tokio::spawn(async move {
let mut response_create_state = ResponseCreateState {
pending_response_create: false,
response_in_progress: false,
};
let mut output_audio_state: Option<OutputAudioState> = None;
loop {
let result = tokio::select! {
// Text typed by the user that should be sent into realtime.
user_text = user_text_rx.recv() => {
handle_user_text_input(
user_text,
&writer,
&events_tx,
session_kind,
&mut response_create_state,
)
handle_user_text_input(user_text, &writer, &events_tx, session_kind)
.await
}
// Background agent progress or final output that should be sent back to realtime.
@@ -902,8 +885,8 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
background_agent_output,
&writer,
&events_tx,
session_kind,
&mut response_create_state,
&handoff_state,
event_parser,
)
.await
}
@@ -916,7 +899,6 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
&handoff_state,
session_kind,
&mut output_audio_state,
&mut response_create_state,
)
.await
}
@@ -938,7 +920,6 @@ async fn handle_user_text_input(
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
session_kind: RealtimeSessionKind,
response_create_state: &mut ResponseCreateState,
) -> anyhow::Result<()> {
let text = text.context("user text input channel closed")?;
@@ -953,18 +934,13 @@ async fn handle_user_text_input(
match session_kind {
RealtimeSessionKind::V1 => {}
RealtimeSessionKind::V2 => {
if response_create_state.response_in_progress {
response_create_state.pending_response_create = true;
} else if let Err(err) = writer.send_response_create().await {
if let Err(err) = writer.send_response_create().await {
let mapped_error = map_api_error(err);
warn!("failed to send text response.create: {mapped_error}");
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
} else {
response_create_state.pending_response_create = false;
response_create_state.response_in_progress = true;
}
}
}
@@ -975,64 +951,67 @@ async fn handle_handoff_output(
handoff_output: Result<HandoffOutput, RecvError>,
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
session_kind: RealtimeSessionKind,
response_create_state: &mut ResponseCreateState,
handoff_state: &RealtimeHandoffState,
event_parser: RealtimeEventParser,
) -> anyhow::Result<()> {
let handoff_output = handoff_output.context("handoff output channel closed")?;
let should_create_response = match handoff_output {
HandoffOutput::ImmediateAppend {
handoff_id,
output_text,
} => {
if let Err(err) = writer
.send_conversation_handoff_append(handoff_id, output_text)
.await
{
let mapped_error = map_api_error(err);
warn!("failed to send handoff output: {mapped_error}");
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
let result = match event_parser {
RealtimeEventParser::V1 => match handoff_output {
HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
}
false
}
HandoffOutput::FinalToolCall {
handoff_id,
output_text,
} => {
if let Err(err) = writer
.send_conversation_handoff_append(handoff_id, output_text)
.await
{
let mapped_error = map_api_error(err);
warn!("failed to send handoff output: {mapped_error}");
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
| HandoffOutput::FinalUpdate {
handoff_id,
output_text,
} => {
writer
.send_conversation_handoff_append(handoff_id, output_text)
.await
}
match session_kind {
RealtimeSessionKind::V1 => false,
RealtimeSessionKind::V2 => true,
},
RealtimeEventParser::RealtimeV2 => match handoff_output {
HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
} => {
let active_handoff = handoff_state.active_handoff.lock().await.clone();
match active_handoff {
Some(active_handoff) if active_handoff == handoff_id => {}
Some(_) | None => {
debug!("dropping stale realtime handoff progress update");
return Ok(());
}
}
writer
.send_conversation_item_create(format!(
"{output_text}{REALTIME_V2_PROGRESS_UPDATE_SUFFIX}"
))
.await
}
}
HandoffOutput::FinalUpdate {
handoff_id,
output_text,
} => {
if let Err(err) = writer
.send_conversation_handoff_append(handoff_id, output_text)
.await
{
Err(err)
} else {
writer.send_response_create().await
}
}
},
};
if should_create_response {
if response_create_state.response_in_progress {
response_create_state.pending_response_create = true;
} else if let Err(err) = writer.send_response_create().await {
let mapped_error = map_api_error(err);
warn!("failed to send handoff response.create: {mapped_error}");
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
} else {
response_create_state.pending_response_create = false;
response_create_state.response_in_progress = true;
}
if let Err(err) = result {
let mapped_error = map_api_error(err);
warn!("failed to send handoff output: {mapped_error}");
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
}
Ok(())
}
@@ -1044,7 +1023,6 @@ async fn handle_realtime_server_event(
handoff_state: &RealtimeHandoffState,
session_kind: RealtimeSessionKind,
output_audio_state: &mut Option<OutputAudioState>,
response_create_state: &mut ResponseCreateState,
) -> anyhow::Result<()> {
let event = match event {
Ok(Some(event)) => event,
@@ -1063,19 +1041,7 @@ async fn handle_realtime_server_event(
}
};
let mut forward_event = true;
let should_stop = match &event {
RealtimeEvent::ConversationItemAdded(item) => {
match session_kind {
RealtimeSessionKind::V1 => {}
RealtimeSessionKind::V2 => {
if let Some("response.created") = item.get("type").and_then(Value::as_str) {
response_create_state.response_in_progress = true;
}
}
}
false
}
RealtimeEvent::AudioOut(frame) => {
match session_kind {
RealtimeSessionKind::V1 => {}
@@ -1114,59 +1080,67 @@ async fn handle_realtime_server_event(
false
}
RealtimeEvent::ResponseCancelled(_) => {
response_create_state.response_in_progress = false;
*output_audio_state = None;
false
}
RealtimeEvent::HandoffRequested(handoff) => {
*output_audio_state = None;
match session_kind {
RealtimeSessionKind::V1 => {}
RealtimeSessionKind::V1 => {
*handoff_state.last_output_text.lock().await = None;
*handoff_state.active_handoff.lock().await = Some(handoff.handoff_id.clone());
}
RealtimeSessionKind::V2 => {
if response_create_state.pending_response_create {
if let Err(err) = writer.send_response_create().await {
let mapped_error = map_api_error(err);
warn!(
"failed to send deferred response.create after cancellation: {mapped_error}"
);
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
let active_handoff = handoff_state.active_handoff.lock().await.clone();
match active_handoff {
Some(_) => {
if let Err(err) = writer
.send_conversation_handoff_append(
handoff.handoff_id.clone(),
REALTIME_V2_STEER_ACKNOWLEDGEMENT.to_string(),
)
.await
{
let mapped_error = map_api_error(err);
warn!(
"failed to send handoff steering acknowledgement: {mapped_error}"
);
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
}
if let Err(err) = writer.send_response_create().await {
let mapped_error = map_api_error(err);
warn!(
"failed to send handoff steering response.create: {mapped_error}"
);
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
}
}
None => {
*handoff_state.last_output_text.lock().await = None;
*handoff_state.active_handoff.lock().await =
Some(handoff.handoff_id.clone());
}
response_create_state.pending_response_create = false;
response_create_state.response_in_progress = true;
}
}
}
false
}
RealtimeEvent::HandoffRequested(handoff) => {
*handoff_state.active_handoff.lock().await = Some(handoff.handoff_id.clone());
*handoff_state.last_output_text.lock().await = None;
response_create_state.response_in_progress = false;
*output_audio_state = None;
false
}
RealtimeEvent::Error(message) => match session_kind {
RealtimeSessionKind::V1 => true,
RealtimeSessionKind::V2 => {
if message.starts_with(ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX) {
warn!(
"realtime rejected response.create because a response is already in progress; deferring follow-up response.create"
);
response_create_state.pending_response_create = true;
response_create_state.response_in_progress = true;
forward_event = false;
false
} else {
true
}
}
},
RealtimeEvent::Error(_) => true,
RealtimeEvent::SessionUpdated { .. }
| RealtimeEvent::InputTranscriptDelta(_)
| RealtimeEvent::OutputTranscriptDelta(_)
| RealtimeEvent::ConversationItemAdded(_)
| RealtimeEvent::ConversationItemDone { .. } => false,
};
if forward_event && events_tx.send(event).await.is_err() {
if events_tx.send(event).await.is_err() {
anyhow::bail!("realtime output event channel closed");
}
if should_stop {