Propagate safety buffering treatment metadata (#29473)

## Summary

- read the request-scoped safety-buffering treatment from HTTP response
headers and per-turn WebSocket metadata through one shared header parser
- combine that treatment with Responses API safety-buffering signals
- propagate `showBufferingUi` and nullable `fasterModel` through the
existing `model/safetyBuffering/updated` app-server notification
- update the app-server documentation and generated JSON and TypeScript
schemas

The public implementation contains no model mapping or real model
identifier. Tests and protocol examples use generic `current-model` and
`faster-model` placeholders only.

## Dependencies

- server-side treatment evaluation:
https://github.com/openai/openai/pull/1060247
- initial Responses API safety-buffering propagation:
https://github.com/openai/codex/pull/29371
- Codex App UI: https://github.com/openai/openai/pull/1057789

## Validation

- Codex API tests: 129 passed
- focused Codex core safety-buffering integration test passed
- app-server protocol tests passed after regenerating schema fixtures
- Clippy fix and repository formatting completed successfully

The broader app-server run compiled all changed crates and completed
with 1,269 passing tests. Its remaining failures were unrelated
environment limitations: macOS sandbox application was denied, one
expected test binary was unavailable, and several existing subprocess
tests timed out as a result.
This commit is contained in:
Francis Chalissery
2026-06-22 19:51:03 -07:00
committed by GitHub
Unverified
parent 67009bc53f
commit 7c22d376e5
17 changed files with 193 additions and 15 deletions
@@ -2585,6 +2585,12 @@
},
"ModelSafetyBufferingUpdatedNotification": {
"properties": {
"fasterModel": {
"type": [
"string",
"null"
]
},
"model": {
"type": "string"
},
@@ -2594,6 +2600,9 @@
},
"type": "array"
},
"showBufferingUi": {
"type": "boolean"
},
"threadId": {
"type": "string"
},
@@ -2610,6 +2619,7 @@
"required": [
"model",
"reasons",
"showBufferingUi",
"threadId",
"turnId",
"useCases"
@@ -12642,6 +12642,12 @@
"ModelSafetyBufferingUpdatedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"fasterModel": {
"type": [
"string",
"null"
]
},
"model": {
"type": "string"
},
@@ -12651,6 +12657,9 @@
},
"type": "array"
},
"showBufferingUi": {
"type": "boolean"
},
"threadId": {
"type": "string"
},
@@ -12667,6 +12676,7 @@
"required": [
"model",
"reasons",
"showBufferingUi",
"threadId",
"turnId",
"useCases"
@@ -9046,6 +9046,12 @@
"ModelSafetyBufferingUpdatedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"fasterModel": {
"type": [
"string",
"null"
]
},
"model": {
"type": "string"
},
@@ -9055,6 +9061,9 @@
},
"type": "array"
},
"showBufferingUi": {
"type": "boolean"
},
"threadId": {
"type": "string"
},
@@ -9071,6 +9080,7 @@
"required": [
"model",
"reasons",
"showBufferingUi",
"threadId",
"turnId",
"useCases"
@@ -1,6 +1,12 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"fasterModel": {
"type": [
"string",
"null"
]
},
"model": {
"type": "string"
},
@@ -10,6 +16,9 @@
},
"type": "array"
},
"showBufferingUi": {
"type": "boolean"
},
"threadId": {
"type": "string"
},
@@ -26,6 +35,7 @@
"required": [
"model",
"reasons",
"showBufferingUi",
"threadId",
"turnId",
"useCases"
@@ -2,4 +2,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ModelSafetyBufferingUpdatedNotification = { threadId: string, turnId: string, model: string, useCases: Array<string>, reasons: Array<string>, };
export type ModelSafetyBufferingUpdatedNotification = { threadId: string, turnId: string, model: string, useCases: Array<string>, reasons: Array<string>, showBufferingUi: boolean, fasterModel: string | null, };
@@ -3392,9 +3392,11 @@ mod tests {
v2::ModelSafetyBufferingUpdatedNotification {
thread_id: "thr_123".to_string(),
turn_id: "turn_123".to_string(),
model: "gpt-5.4".to_string(),
model: "current-model".to_string(),
use_cases: vec!["cyber".to_string()],
reasons: vec!["user_risk".to_string()],
show_buffering_ui: true,
faster_model: Some("faster-model".to_string()),
},
);
assert_eq!(
@@ -3403,9 +3405,11 @@ mod tests {
"params": {
"threadId": "thr_123",
"turnId": "turn_123",
"model": "gpt-5.4",
"model": "current-model",
"useCases": ["cyber"],
"reasons": ["user_risk"]
"reasons": ["user_risk"],
"showBufferingUi": true,
"fasterModel": "faster-model"
}
}),
serde_json::to_value(&notification)?,
@@ -172,4 +172,6 @@ pub struct ModelSafetyBufferingUpdatedNotification {
pub model: String,
pub use_cases: Vec<String>,
pub reasons: Vec<String>,
pub show_buffering_ui: bool,
pub faster_model: Option<String>,
}
+1 -1
View File
@@ -1355,7 +1355,7 @@ The app-server streams JSON-RPC notifications while a turn is running. Each turn
- `turn/completed``{ turn }` where `turn.status` is `completed`, `interrupted`, or `failed`; failures carry `{ error: { message, codexErrorInfo?, additionalDetails? } }`.
- `turn/diff/updated``{ threadId, turnId, diff }` represents the up-to-date snapshot of the turn-level unified diff, emitted after every FileChange item. `diff` is the latest aggregated unified diff across every file change in the turn. UIs can render this to show the full "what changed" view without stitching individual `fileChange` items.
- `turn/plan/updated``{ turnId, explanation?, plan }` whenever the agent shares or changes its plan; each `plan` entry is `{ step, status }` with `status` in `pending`, `inProgress`, or `completed`.
- `model/safetyBuffering/updated``{ threadId, turnId, model, useCases, reasons }` when a response enters safety buffering. This notification is transient and is not persisted in rollout history.
- `model/safetyBuffering/updated``{ threadId, turnId, model, useCases, reasons, showBufferingUi, fasterModel }` when a response enters safety buffering. `fasterModel` is nullable. This notification is transient and is not persisted in rollout history.
- `model/rerouted``{ threadId, turnId, fromModel, toModel, reason }` when the backend reroutes a request to a different model (for example, due to high-risk cyber safety checks).
- `model/verification``{ threadId, turnId, verifications }` when the backend flags additional account verification, such as `trustedAccessForCyber`.
- `turn/moderationMetadata` — experimental; `{ threadId, turnId, metadata }` when a first-party backend supplies turn-scoped moderation metadata for client-side presentation.
@@ -359,6 +359,8 @@ pub(crate) async fn apply_bespoke_event_handling(
model: event.model,
use_cases: event.use_cases,
reasons: event.reasons,
show_buffering_ui: event.show_buffering_ui,
faster_model: event.faster_model,
};
outgoing
.send_server_notification(ServerNotification::ModelSafetyBufferingUpdated(
+18
View File
@@ -118,6 +118,24 @@ pub enum ResponseEvent {
pub struct SafetyBuffering {
pub use_cases: Vec<String>,
pub reasons: Vec<String>,
#[serde(skip)]
pub show_buffering_ui: bool,
#[serde(skip)]
pub faster_model: Option<String>,
}
impl SafetyBuffering {
pub(crate) fn with_treatment(mut self, treatment: &SafetyBufferingTreatment) -> Self {
self.show_buffering_ui = treatment.show_buffering_ui;
self.faster_model.clone_from(&treatment.faster_model);
self
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub(crate) struct SafetyBufferingTreatment {
pub show_buffering_ui: bool,
pub faster_model: Option<String>,
}
#[derive(Debug, Serialize, Clone, PartialEq)]
@@ -2,9 +2,11 @@ use crate::auth::SharedAuthProvider;
use crate::common::ResponseEvent;
use crate::common::ResponseStream;
use crate::common::ResponsesWsRequest;
use crate::common::SafetyBufferingTreatment;
use crate::error::ApiError;
use crate::provider::Provider;
use crate::rate_limits::parse_rate_limit_event;
use crate::safety_buffering::treatment_from_headers;
use crate::sse::ResponsesStreamEvent;
use crate::sse::process_responses_event;
use crate::telemetry::WebsocketTelemetry;
@@ -595,12 +597,12 @@ fn map_wrapped_websocket_error_event(
Some(ApiError::Transport(TransportError::Http {
status,
url: None,
headers: headers.map(json_headers_to_http_headers),
headers: headers.as_ref().map(json_headers_to_http_headers),
body: Some(original_payload),
}))
}
fn json_headers_to_http_headers(headers: JsonMap<String, Value>) -> HeaderMap {
fn json_headers_to_http_headers(headers: &JsonMap<String, Value>) -> HeaderMap {
let mut mapped = HeaderMap::new();
for (name, value) in headers {
let Ok(header_name) = HeaderName::from_bytes(name.as_bytes()) else {
@@ -614,9 +616,9 @@ fn json_headers_to_http_headers(headers: JsonMap<String, Value>) -> HeaderMap {
mapped
}
fn json_header_value(value: Value) -> Option<HeaderValue> {
fn json_header_value(value: &Value) -> Option<HeaderValue> {
let value = match value {
Value::String(value) => value,
Value::String(value) => value.clone(),
Value::Number(value) => value.to_string(),
Value::Bool(value) => value.to_string(),
_ => return None,
@@ -634,6 +636,7 @@ async fn run_websocket_response_stream(
turn_state: Option<&OnceLock<String>>,
) -> Result<(), ApiError> {
let mut last_server_model: Option<String> = None;
let mut safety_buffering_treatment = SafetyBufferingTreatment::default();
send_websocket_request(
ws_stream,
request_text,
@@ -687,9 +690,17 @@ async fn run_websocket_response_stream(
{
let _ = turn_state.set(response_turn_state);
}
if let Some(headers) = event.headers.as_ref().and_then(Value::as_object)
&& let Some(treatment) =
treatment_from_headers(&json_headers_to_http_headers(headers))
{
safety_buffering_treatment = treatment;
}
let model_verifications = event.model_verifications();
let turn_moderation_metadata = event.turn_moderation_metadata();
let safety_buffering = event.safety_buffering();
let safety_buffering = event
.safety_buffering()
.map(|buffering| buffering.with_treatment(&safety_buffering_treatment));
if event.kind() == "codex.rate_limits" {
if let Some(snapshot) = parse_rate_limit_event(&text) {
let _ = tx_event.send(Ok(ResponseEvent::RateLimits(snapshot))).await;
+1
View File
@@ -8,6 +8,7 @@ pub(crate) mod images;
pub(crate) mod provider;
pub(crate) mod rate_limits;
pub(crate) mod requests;
pub(crate) mod safety_buffering;
pub(crate) mod search;
pub(crate) mod sse;
pub(crate) mod telemetry;
@@ -0,0 +1,54 @@
use crate::common::SafetyBufferingTreatment;
use http::HeaderMap;
pub(crate) const X_CODEX_SAFETY_BUFFERING_ENABLED_HEADER: &str = "x-codex-safety-buffering-enabled";
pub(crate) const X_CODEX_SAFETY_BUFFERING_FASTER_MODEL_HEADER: &str =
"x-codex-safety-buffering-faster-model";
pub(crate) fn treatment_from_headers(headers: &HeaderMap) -> Option<SafetyBufferingTreatment> {
let show_buffering_ui = headers
.get(X_CODEX_SAFETY_BUFFERING_ENABLED_HEADER)
.and_then(|value| value.to_str().ok())?
.eq_ignore_ascii_case("true");
let faster_model = if show_buffering_ui {
headers
.get(X_CODEX_SAFETY_BUFFERING_FASTER_MODEL_HEADER)
.and_then(|value| value.to_str().ok())
.map(str::to_string)
} else {
None
};
Some(SafetyBufferingTreatment {
show_buffering_ui,
faster_model,
})
}
#[cfg(test)]
mod tests {
use super::*;
use http::HeaderValue;
use pretty_assertions::assert_eq;
#[test]
fn reads_treatment_from_http_headers() {
let mut headers = HeaderMap::new();
headers.insert(
X_CODEX_SAFETY_BUFFERING_ENABLED_HEADER,
HeaderValue::from_static("true"),
);
headers.insert(
X_CODEX_SAFETY_BUFFERING_FASTER_MODEL_HEADER,
HeaderValue::from_static("faster-model"),
);
assert_eq!(
treatment_from_headers(&headers),
Some(SafetyBufferingTreatment {
show_buffering_ui: true,
faster_model: Some("faster-model".to_string()),
})
);
}
}
+34 -3
View File
@@ -1,8 +1,10 @@
use crate::common::ResponseEvent;
use crate::common::ResponseStream;
use crate::common::SafetyBuffering;
use crate::common::SafetyBufferingTreatment;
use crate::error::ApiError;
use crate::rate_limits::parse_all_rate_limits;
use crate::safety_buffering::treatment_from_headers;
use crate::telemetry::SseTelemetry;
use codex_client::ByteStream;
use codex_client::StreamResponse;
@@ -55,6 +57,8 @@ pub fn spawn_response_stream(
.get(REQUEST_ID_HEADER)
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let safety_buffering_treatment =
treatment_from_headers(&stream_response.headers).unwrap_or_default();
if let Some(turn_state) = turn_state.as_ref()
&& let Some(header_value) = stream_response
.headers
@@ -79,7 +83,14 @@ pub fn spawn_response_stream(
.send(Ok(ResponseEvent::ServerReasoningIncluded(true)))
.await;
}
process_sse(stream_response.bytes, tx_event, idle_timeout, telemetry).await;
process_sse_with_treatment(
stream_response.bytes,
tx_event,
idle_timeout,
telemetry,
safety_buffering_treatment,
)
.await;
});
ResponseStream {
@@ -149,7 +160,7 @@ struct ResponseCompletedOutputTokensDetails {
pub struct ResponsesStreamEvent {
#[serde(rename = "type")]
pub(crate) kind: String,
headers: Option<Value>,
pub(crate) headers: Option<Value>,
metadata: Option<Value>,
response: Option<Value>,
item: Option<Value>,
@@ -437,11 +448,29 @@ pub fn process_responses_event(
Ok(None)
}
#[cfg(test)]
pub async fn process_sse(
stream: ByteStream,
tx_event: mpsc::Sender<Result<ResponseEvent, ApiError>>,
idle_timeout: Duration,
telemetry: Option<Arc<dyn SseTelemetry>>,
) {
process_sse_with_treatment(
stream,
tx_event,
idle_timeout,
telemetry,
SafetyBufferingTreatment::default(),
)
.await;
}
async fn process_sse_with_treatment(
stream: ByteStream,
tx_event: mpsc::Sender<Result<ResponseEvent, ApiError>>,
idle_timeout: Duration,
telemetry: Option<Arc<dyn SseTelemetry>>,
safety_buffering_treatment: SafetyBufferingTreatment,
) {
let mut stream = stream.eventsource();
let mut response_error: Option<ApiError> = None;
@@ -486,7 +515,9 @@ pub async fn process_sse(
};
let model_verifications = event.model_verifications();
let turn_moderation_metadata = event.turn_moderation_metadata();
let safety_buffering = event.safety_buffering();
let safety_buffering = event
.safety_buffering()
.map(|buffering| buffering.with_treatment(&safety_buffering_treatment));
if let Some(model) = event.response_model()
&& last_server_model.as_deref() != Some(model.as_str())
+2
View File
@@ -2234,6 +2234,8 @@ async fn try_run_sampling_request(
model: turn_context.model_info.slug.clone(),
use_cases: buffering.use_cases,
reasons: buffering.reasons,
show_buffering_ui: buffering.show_buffering_ui,
faster_model: buffering.faster_model,
}),
)
.await;
+13 -2
View File
@@ -5,8 +5,9 @@ use codex_protocol::protocol::SafetyBufferingEvent;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_response_once;
use core_test_support::responses::sse;
use core_test_support::responses::sse_response;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
@@ -15,6 +16,8 @@ use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::json;
const FASTER_MODEL: &str = "faster-model";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn emits_safety_buffering_with_the_requested_model() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
@@ -25,7 +28,13 @@ async fn emits_safety_buffering_with_the_requested_model() -> anyhow::Result<()>
"use_cases": ["cyber"],
"reasons": ["policy-check"],
});
mount_sse_once(&server, sse(vec![created, ev_completed("resp-1")])).await;
mount_response_once(
&server,
sse_response(sse(vec![created, ev_completed("resp-1")]))
.insert_header("x-codex-safety-buffering-enabled", "true")
.insert_header("x-codex-safety-buffering-faster-model", FASTER_MODEL),
)
.await;
let test = test_codex().build(&server).await?;
test.codex
@@ -52,6 +61,8 @@ async fn emits_safety_buffering_with_the_requested_model() -> anyhow::Result<()>
model: test.session_configured.model.clone(),
use_cases: vec!["cyber".to_string()],
reasons: vec!["policy-check".to_string()],
show_buffering_ui: true,
faster_model: Some(FASTER_MODEL.to_string()),
}
);
wait_for_event(&test.codex, |event| {
+2
View File
@@ -1952,6 +1952,8 @@ pub struct SafetyBufferingEvent {
pub model: String,
pub use_cases: Vec<String>,
pub reasons: Vec<String>,
pub show_buffering_ui: bool,
pub faster_model: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]