Remove response.processed websocket request (#26447)

## Why

The Responses websocket client no longer needs to send a follow-up
`response.processed` request after a turn response has already been
recorded. Keeping that extra acknowledgement path adds feature-gated
control flow and a second websocket request shape that no longer carries
useful behavior.

## What Changed

- Removed the `response.processed` websocket request type and sender.
- Removed the `responses_websocket_response_processed` feature flag and
schema entry.
- Removed turn and remote-compaction plumbing that only tracked response
IDs to send the acknowledgement.
- Removed tests that existed solely to cover the deleted feature path.

## Validation

- `just fix -p codex-core -p codex-api -p codex-features`
This commit is contained in:
pakrym-oai
2026-06-04 13:15:50 -07:00
committed by GitHub
Unverified
parent f97d5c3275
commit d312a53e2a
10 changed files with 8 additions and 284 deletions
-7
View File
@@ -239,11 +239,6 @@ pub struct ResponseCreateWsRequest {
pub client_metadata: Option<HashMap<String, String>>,
}
#[derive(Debug, Serialize)]
pub struct ResponseProcessedWsRequest {
pub response_id: String,
}
pub fn response_create_client_metadata(
client_metadata: Option<HashMap<String, String>>,
trace: Option<&W3cTraceContext>,
@@ -272,8 +267,6 @@ pub fn response_create_client_metadata(
pub enum ResponsesWsRequest {
#[serde(rename = "response.create")]
ResponseCreate(ResponseCreateWsRequest),
#[serde(rename = "response.processed")]
ResponseProcessed(ResponseProcessedWsRequest),
}
pub fn create_text_param_for_request(
@@ -1,6 +1,5 @@
use crate::auth::SharedAuthProvider;
use crate::common::ResponseEvent;
use crate::common::ResponseProcessedWsRequest;
use crate::common::ResponseStream;
use crate::common::ResponsesWsRequest;
use crate::error::ApiError;
@@ -206,40 +205,6 @@ impl ResponsesWebsocketConnection {
self.stream.lock().await.is_none()
}
#[instrument(
name = "responses_websocket.send_response_processed",
level = "info",
skip_all,
fields(transport = "responses_websocket", api.path = "responses")
)]
#[expect(
clippy::await_holding_invalid_type,
reason = "the guard serializes exclusive use of the websocket while sending a request frame"
)]
pub async fn send_response_processed(&self, response_id: String) -> Result<(), ApiError> {
let request =
ResponsesWsRequest::ResponseProcessed(ResponseProcessedWsRequest { response_id });
let request_body = serde_json::to_value(&request).map_err(|err| {
ApiError::Stream(format!("failed to encode websocket request: {err}"))
})?;
let mut guard = self.stream.lock().await;
let Some(ws_stream) = guard.as_mut() else {
return Err(ApiError::Stream(
"websocket connection is closed".to_string(),
));
};
send_websocket_request(
ws_stream,
request_body,
self.idle_timeout,
self.telemetry.as_ref(),
/*connection_reused*/ true,
)
.await
}
#[instrument(
name = "responses_websocket.stream_request",
level = "info",
-1
View File
@@ -32,7 +32,6 @@ pub use crate::common::RawMemoryMetadata;
pub use crate::common::Reasoning;
pub use crate::common::ResponseCreateWsRequest;
pub use crate::common::ResponseEvent;
pub use crate::common::ResponseProcessedWsRequest;
pub use crate::common::ResponseStream;
pub use crate::common::ResponsesApiRequest;
pub use crate::common::ResponsesWsRequest;
+1 -7
View File
@@ -572,9 +572,6 @@
"request_rule": {
"type": "boolean"
},
"responses_websocket_response_processed": {
"type": "boolean"
},
"responses_websockets": {
"type": "boolean"
},
@@ -4705,9 +4702,6 @@
"request_rule": {
"type": "boolean"
},
"responses_websocket_response_processed": {
"type": "boolean"
},
"responses_websockets": {
"type": "boolean"
},
@@ -5231,4 +5225,4 @@
},
"title": "ConfigToml",
"type": "object"
}
}
+1 -16
View File
@@ -100,7 +100,6 @@ use tokio::sync::oneshot::error::TryRecvError;
use tokio_tungstenite::tungstenite::Error;
use tokio_tungstenite::tungstenite::Message;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::instrument;
use tracing::trace;
use tracing::warn;
@@ -967,18 +966,6 @@ impl ModelClientSession {
.set_connection_reused(/*connection_reused*/ false);
}
pub(crate) async fn send_response_processed(&self, response_id: &str) {
let Some(connection) = self.websocket_session.connection.as_ref() else {
return;
};
if let Err(err) = connection
.send_response_processed(response_id.to_string())
.await
{
debug!("failed to send response.processed websocket request: {err}");
}
}
#[allow(clippy::too_many_arguments)]
/// Builds shared Responses API transport options and request-body options.
///
@@ -1672,9 +1659,7 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<Head
/// Meant to be called just before sending the request over the socket, to capture realistic
/// transport timing.
fn stamp_ws_stream_request_start_ms(request: &mut ResponsesWsRequest) {
let ResponsesWsRequest::ResponseCreate(payload) = request else {
return;
};
let ResponsesWsRequest::ResponseCreate(payload) = request;
payload
.client_metadata
.get_or_insert_with(HashMap::new)
+5 -20
View File
@@ -26,7 +26,6 @@ use codex_analytics::CompactionImplementation;
use codex_analytics::CompactionPhase;
use codex_analytics::CompactionReason;
use codex_analytics::CompactionTrigger;
use codex_features::Feature;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::ContextCompactionItem;
@@ -281,7 +280,6 @@ async fn run_remote_compact_task_inner_impl(
);
let RemoteCompactionV2Output {
compaction_output,
response_id,
token_usage,
} = compaction_output_result?;
if let Some(token_usage) = token_usage {
@@ -314,18 +312,11 @@ async fn run_remote_compact_task_inner_impl(
sess.emit_turn_item_completed(turn_context, compaction_item)
.await;
if turn_context
.features
.enabled(Feature::ResponsesWebsocketResponseProcessed)
{
client_session.send_response_processed(&response_id).await;
}
Ok(())
}
struct RemoteCompactionV2Output {
compaction_output: ResponseItem,
response_id: String,
token_usage: Option<TokenUsage>,
}
@@ -409,7 +400,7 @@ async fn collect_compaction_output(
let mut output_item_count = 0usize;
let mut compaction_count = 0usize;
let mut compaction_output = None;
let mut completed_response_id = None;
let mut saw_completed = false;
let mut completed_token_usage = None;
while let Some(event) = stream.next().await {
match event? {
@@ -422,12 +413,8 @@ async fn collect_compaction_output(
}
}
}
ResponseEvent::Completed {
response_id,
token_usage,
..
} => {
completed_response_id = Some(response_id);
ResponseEvent::Completed { token_usage, .. } => {
saw_completed = true;
completed_token_usage = token_usage;
break;
}
@@ -435,12 +422,12 @@ async fn collect_compaction_output(
}
}
let Some(response_id) = completed_response_id else {
if !saw_completed {
return Err(CodexErr::Stream(
"remote compaction v2 stream closed before response.completed".to_string(),
None,
));
};
}
if compaction_count != 1 {
return Err(CodexErr::Fatal(format!(
@@ -453,7 +440,6 @@ async fn collect_compaction_output(
};
Ok(RemoteCompactionV2Output {
compaction_output,
response_id,
token_usage: completed_token_usage,
})
}
@@ -804,7 +790,6 @@ mod tests {
.expect("compaction should be collected");
assert_eq!(output.compaction_output, compaction);
assert_eq!(output.response_id, "resp-compact");
assert_eq!(
output.token_usage,
Some(TokenUsage {
+1 -12
View File
@@ -1827,7 +1827,6 @@ async fn try_run_sampling_request(
!sess.services.extensions.turn_item_contributors().is_empty();
let mut active_item_is_streaming_to_client = false;
let receiving_span = trace_span!("receiving_stream");
let mut completed_response_id: Option<String> = None;
let outcome: CodexResult<SamplingRequestResult> = loop {
let handle_responses = trace_span!(
parent: &receiving_span,
@@ -2071,9 +2070,9 @@ async fn try_run_sampling_request(
sess.services.models_manager.refresh_if_new_etag(etag).await;
}
ResponseEvent::Completed {
response_id,
token_usage,
end_turn,
..
} => {
flush_assistant_text_segments_all(
&sess,
@@ -2089,7 +2088,6 @@ async fn try_run_sampling_request(
if let Some(false) = end_turn {
needs_follow_up = true;
}
completed_response_id = Some(response_id);
break Ok(SamplingRequestResult {
needs_follow_up,
last_agent_message,
@@ -2213,15 +2211,6 @@ async fn try_run_sampling_request(
)
.await;
if sess
.features
.enabled(Feature::ResponsesWebsocketResponseProcessed)
&& outcome.is_ok()
&& let Some(response_id) = completed_response_id.as_deref()
{
client_session.send_response_processed(response_id).await;
}
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
if should_emit_token_count {
@@ -185,168 +185,6 @@ async fn responses_websocket_streams_without_feature_flag_when_provider_supports
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_sends_response_processed_when_feature_enabled() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![
vec![
ev_response_created("resp-prewarm"),
ev_completed("resp-prewarm"),
],
vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "hi"),
ev_completed("resp-1"),
],
vec![],
]])
.await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::ResponsesWebsocketResponseProcessed)
.expect("test config should allow feature update");
});
let test = builder
.build_with_websocket_server(&server)
.await
.expect("build websocket codex");
test.submit_turn("hello")
.await
.expect("submission should send response.processed after processing");
let processed = server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 2)
.await;
assert_eq!(
processed.body_json(),
json!({
"type": "response.processed",
"response_id": "resp-1",
})
);
let connection = server.single_connection();
assert_eq!(connection.len(), 3);
assert_eq!(
connection[1].body_json()["type"].as_str(),
Some("response.create")
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_sends_response_processed_after_remote_compaction_v2() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![
vec![
ev_response_created("resp-prewarm"),
ev_completed("resp-prewarm"),
],
vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "hi"),
ev_completed("resp-1"),
],
vec![],
vec![
json!({
"type": "response.output_item.done",
"item": {
"type": "compaction",
"encrypted_content": "ENCRYPTED_CONTEXT_COMPACTION_SUMMARY",
}
}),
ev_completed("resp-compact"),
],
vec![],
]])
.await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::RemoteCompactionV2)
.expect("test config should allow feature update");
config
.features
.enable(Feature::ResponsesWebsocketResponseProcessed)
.expect("test config should allow feature update");
});
let test = builder
.build_with_websocket_server(&server)
.await
.expect("build websocket codex");
test.submit_turn("hello")
.await
.expect("submission should send response.processed after processing");
test.codex
.submit(Op::Compact)
.await
.expect("compact submission should succeed");
wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
let compact_processed = server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 4)
.await;
assert_eq!(
compact_processed.body_json(),
json!({
"type": "response.processed",
"response_id": "resp-compact",
})
);
let connection = server.single_connection();
assert_eq!(connection.len(), 5);
assert_eq!(
connection[3].body_json()["type"].as_str(),
Some("response.create")
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_omits_response_processed_without_feature() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![
vec![
ev_response_created("resp-prewarm"),
ev_completed("resp-prewarm"),
],
vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "hi"),
ev_completed("resp-1"),
],
vec![],
]])
.await;
let mut builder = test_codex();
let test = builder
.build_with_websocket_server(&server)
.await
.expect("build websocket codex");
test.submit_turn("hello")
.await
.expect("submission should complete without response.processed");
let connection = server.single_connection();
assert_eq!(connection.len(), 2);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_reuses_connection_with_per_turn_trace_payloads() {
skip_if_no_network!();
-8
View File
@@ -207,8 +207,6 @@ pub enum Feature {
RealtimeConversation,
/// Prevent idle system sleep while a turn is actively running.
PreventIdleSleep,
/// Send `response.processed` over Responses API websockets after a turn response is recorded.
ResponsesWebsocketResponseProcessed,
/// Enable remote compaction v2 over the normal Responses API.
RemoteCompactionV2,
/// Enable workspace dependency support.
@@ -1228,12 +1226,6 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Removed,
default_enabled: false,
},
FeatureSpec {
id: Feature::ResponsesWebsocketResponseProcessed,
key: "responses_websocket_response_processed",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::RemoteCompactionV2,
key: "remote_compaction_v2",
-16
View File
@@ -146,22 +146,6 @@ fn remote_compaction_v2_is_under_development() {
);
}
#[test]
fn responses_websocket_response_processed_is_under_development() {
assert_eq!(
Feature::ResponsesWebsocketResponseProcessed.stage(),
Stage::UnderDevelopment
);
assert_eq!(
Feature::ResponsesWebsocketResponseProcessed.default_enabled(),
false
);
assert_eq!(
feature_for_key("responses_websocket_response_processed"),
Some(Feature::ResponsesWebsocketResponseProcessed)
);
}
#[test]
fn terminal_resize_reflow_is_experimental_and_enabled_by_default() {
assert_eq!(