feat: add remote compaction v2 Responses client path (#20773)

## Why

This adds the `remote_compaction_v2` client path so remote compaction
can run through the normal Responses stream and install a
`context_compaction` item that trigger a compaction.

The goal is to migrate some of the compaction logic on the client side

We keeps the v2 transport behind a feature flag while letting follow-up
requests reuse the compacted context instead of falling back to the
legacy compaction item shape.

## What changed

- add `ResponseItem::ContextCompaction` and refresh the generated
app-server / schema / TypeScript fixtures that expose response items on
the wire
- add `core/src/compact_remote_v2.rs` to send compaction through the
standard streamed Responses client, require exactly one
`context_compaction` output item, and install that item into compacted
history
- route manual compact and auto-compaction through the v2 path when
`remote_compaction_v2` is enabled, while keeping the existing remote
compaction path as the fallback
- preserve the new item type across history retention, follow-up request
construction, telemetry, rollout persistence, and rollout-trace
normalization
- add targeted coverage for the feature flag, `context_compaction`
serialization, rollout-trace normalization, and remote-compaction
follow-up behavior

## Verification

- added protocol tests for `context_compaction`
serialization/deserialization in `protocol/src/models.rs`
- added rollout-trace coverage for `context_compaction` normalization in
`rollout-trace/src/reducer/conversation_tests.rs`
- added remote compaction integration coverage for v2 follow-up reuse
and mixed compaction output streams in
`core/tests/suite/compact_remote.rs`

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
jif-oai
2026-05-04 14:15:01 +02:00
committed by GitHub
Unverified
parent d013155f40
commit d927f61208
27 changed files with 991 additions and 43 deletions
@@ -2850,6 +2850,28 @@
"title": "CompactionResponseItem",
"type": "object"
},
{
"properties": {
"encrypted_content": {
"type": [
"string",
"null"
]
},
"type": {
"enum": [
"context_compaction"
],
"title": "ContextCompactionResponseItemType",
"type": "string"
}
},
"required": [
"type"
],
"title": "ContextCompactionResponseItem",
"type": "object"
},
{
"properties": {
"type": {
@@ -13791,6 +13791,28 @@
"title": "CompactionResponseItem",
"type": "object"
},
{
"properties": {
"encrypted_content": {
"type": [
"string",
"null"
]
},
"type": {
"enum": [
"context_compaction"
],
"title": "ContextCompactionResponseItemType",
"type": "string"
}
},
"required": [
"type"
],
"title": "ContextCompactionResponseItem",
"type": "object"
},
{
"properties": {
"type": {
@@ -10444,6 +10444,28 @@
"title": "CompactionResponseItem",
"type": "object"
},
{
"properties": {
"encrypted_content": {
"type": [
"string",
"null"
]
},
"type": {
"enum": [
"context_compaction"
],
"title": "ContextCompactionResponseItemType",
"type": "string"
}
},
"required": [
"type"
],
"title": "ContextCompactionResponseItem",
"type": "object"
},
{
"properties": {
"type": {
@@ -732,6 +732,28 @@
"title": "CompactionResponseItem",
"type": "object"
},
{
"properties": {
"encrypted_content": {
"type": [
"string",
"null"
]
},
"type": {
"enum": [
"context_compaction"
],
"title": "ContextCompactionResponseItemType",
"type": "string"
}
},
"required": [
"type"
],
"title": "ContextCompactionResponseItem",
"type": "object"
},
{
"properties": {
"type": {
@@ -862,6 +862,28 @@
"title": "CompactionResponseItem",
"type": "object"
},
{
"properties": {
"encrypted_content": {
"type": [
"string",
"null"
]
},
"type": {
"enum": [
"context_compaction"
],
"title": "ContextCompactionResponseItemType",
"type": "string"
}
},
"required": [
"type"
],
"title": "ContextCompactionResponseItem",
"type": "object"
},
{
"properties": {
"type": {
@@ -14,4 +14,4 @@ export type ResponseItem = { "type": "message", role: string, content: Array<Con
/**
* Set when using the Responses API.
*/
call_id: string | null, status: LocalShellStatus, action: LocalShellAction, } | { "type": "function_call", name: string, namespace?: string, arguments: string, call_id: string, } | { "type": "tool_search_call", call_id: string | null, status?: string, execution: string, arguments: unknown, } | { "type": "function_call_output", call_id: string, output: FunctionCallOutputBody, } | { "type": "custom_tool_call", status?: string, call_id: string, name: string, input: string, } | { "type": "custom_tool_call_output", call_id: string, name?: string, output: FunctionCallOutputBody, } | { "type": "tool_search_output", call_id: string | null, status: string, execution: string, tools: unknown[], } | { "type": "web_search_call", status?: string, action?: WebSearchAction, } | { "type": "image_generation_call", id: string, status: string, revised_prompt?: string, result: string, } | { "type": "compaction", encrypted_content: string, } | { "type": "other" };
call_id: string | null, status: LocalShellStatus, action: LocalShellAction, } | { "type": "function_call", name: string, namespace?: string, arguments: string, call_id: string, } | { "type": "tool_search_call", call_id: string | null, status?: string, execution: string, arguments: unknown, } | { "type": "function_call_output", call_id: string, output: FunctionCallOutputBody, } | { "type": "custom_tool_call", status?: string, call_id: string, name: string, input: string, } | { "type": "custom_tool_call_output", call_id: string, name?: string, output: FunctionCallOutputBody, } | { "type": "tool_search_output", call_id: string | null, status: string, execution: string, tools: unknown[], } | { "type": "web_search_call", status?: string, action?: WebSearchAction, } | { "type": "image_generation_call", id: string, status: string, revised_prompt?: string, result: string, } | { "type": "compaction", encrypted_content: string, } | { "type": "context_compaction", encrypted_content?: string, } | { "type": "other" };
+6
View File
@@ -496,6 +496,9 @@
"realtime_conversation": {
"type": "boolean"
},
"remote_compaction_v2": {
"type": "boolean"
},
"remote_control": {
"type": "boolean"
},
@@ -3962,6 +3965,9 @@
"realtime_conversation": {
"type": "boolean"
},
"remote_compaction_v2": {
"type": "boolean"
},
"remote_control": {
"type": "boolean"
},
+1
View File
@@ -115,6 +115,7 @@ fn keep_forked_rollout_item(item: &RolloutItem) -> bool {
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::Compaction { .. }
| ResponseItem::ContextCompaction { .. }
| ResponseItem::Other,
) => false,
// A forked child gets its own runtime config, including spawned-agent
+1
View File
@@ -384,6 +384,7 @@ fn build_arc_monitor_message_item(
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::Compaction { .. }
| ResponseItem::ContextCompaction { .. }
| ResponseItem::Other => None,
}
}
+5
View File
@@ -471,6 +471,11 @@ impl ModelClient {
if let Ok(header_value) = HeaderValue::from_str(&self.state.installation_id) {
extra_headers.insert(X_CODEX_INSTALLATION_ID_HEADER, header_value);
}
extra_headers.extend(build_responses_headers(
self.state.beta_features_header.as_deref(),
/*turn_state*/ None,
/*turn_metadata_header*/ None,
));
extra_headers.extend(self.build_responses_identity_headers());
extra_headers.extend(build_conversation_headers(Some(
self.state.conversation_id.to_string(),
+7 -1
View File
@@ -420,7 +420,13 @@ pub(crate) fn insert_initial_context_before_last_real_user_or_summary(
.iter()
.enumerate()
.rev()
.find_map(|(i, item)| matches!(item, ResponseItem::Compaction { .. }).then_some(i));
.find_map(|(i, item)| {
matches!(
item,
ResponseItem::Compaction { .. } | ResponseItem::ContextCompaction { .. }
)
.then_some(i)
});
let insertion_index = last_real_user_index
.or(last_user_or_summary_index)
.or(last_compaction_index);
+5 -5
View File
@@ -268,7 +268,7 @@ fn should_keep_compacted_history_item(item: &ResponseItem) -> bool {
}
ResponseItem::Message { role, .. } if role == "assistant" => true,
ResponseItem::Message { .. } => false,
ResponseItem::Compaction { .. } => true,
ResponseItem::Compaction { .. } | ResponseItem::ContextCompaction { .. } => true,
ResponseItem::Reasoning { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
@@ -284,11 +284,11 @@ fn should_keep_compacted_history_item(item: &ResponseItem) -> bool {
}
#[derive(Debug)]
struct CompactRequestLogData {
pub(crate) struct CompactRequestLogData {
failing_compaction_request_model_visible_bytes: i64,
}
fn build_compact_request_log_data(
pub(crate) fn build_compact_request_log_data(
input: &[ResponseItem],
instructions: &str,
) -> CompactRequestLogData {
@@ -305,7 +305,7 @@ fn build_compact_request_log_data(
}
}
fn log_remote_compact_failure(
pub(crate) fn log_remote_compact_failure(
turn_context: &TurnContext,
log_data: &CompactRequestLogData,
total_usage_breakdown: TotalTokenUsageBreakdown,
@@ -324,7 +324,7 @@ fn log_remote_compact_failure(
);
}
fn trim_function_call_history_to_fit_context_window(
pub(crate) fn trim_function_call_history_to_fit_context_window(
history: &mut ContextManager,
turn_context: &TurnContext,
base_instructions: &BaseInstructions,
+448
View File
@@ -0,0 +1,448 @@
use std::collections::HashSet;
use std::sync::Arc;
use crate::Prompt;
use crate::ResponseStream;
use crate::client::ModelClientSession;
use crate::client_common::ResponseEvent;
use crate::compact::CompactionAnalyticsAttempt;
use crate::compact::InitialContextInjection;
use crate::compact::compaction_status_from_result;
use crate::compact_remote::build_compact_request_log_data;
use crate::compact_remote::log_remote_compact_failure;
use crate::compact_remote::process_compacted_history;
use crate::compact_remote::trim_function_call_history_to_fit_context_window;
use crate::session::session::Session;
use crate::session::turn::built_tools;
use crate::session::turn_context::TurnContext;
use codex_analytics::CompactionImplementation;
use codex_analytics::CompactionPhase;
use codex_analytics::CompactionReason;
use codex_analytics::CompactionTrigger;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnStartedEvent;
use codex_rollout_trace::CompactionCheckpointTracePayload;
use codex_rollout_trace::InferenceTraceContext;
use futures::StreamExt;
use futures::TryFutureExt;
use tokio_util::sync::CancellationToken;
use tracing::info;
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
client_session: &mut ModelClientSession,
initial_context_injection: InitialContextInjection,
reason: CompactionReason,
phase: CompactionPhase,
) -> CodexResult<()> {
run_remote_compact_task_inner(
&sess,
&turn_context,
Some(client_session),
initial_context_injection,
CompactionTrigger::Auto,
reason,
phase,
)
.await
}
pub(crate) async fn run_remote_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) -> CodexResult<()> {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_context.sub_id.clone(),
started_at: turn_context.turn_timing_state.started_at_unix_secs().await,
model_context_window: turn_context.model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_remote_compact_task_inner(
&sess,
&turn_context,
/*client_session*/ None,
InitialContextInjection::DoNotInject,
CompactionTrigger::Manual,
CompactionReason::UserRequested,
CompactionPhase::StandaloneTurn,
)
.await
}
async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
client_session: Option<&mut ModelClientSession>,
initial_context_injection: InitialContextInjection,
trigger: CompactionTrigger,
reason: CompactionReason,
phase: CompactionPhase,
) -> CodexResult<()> {
let attempt = CompactionAnalyticsAttempt::begin(
sess.as_ref(),
turn_context.as_ref(),
trigger,
reason,
CompactionImplementation::Responses,
phase,
)
.await;
let result = run_remote_compact_task_inner_impl(
sess,
turn_context,
client_session,
initial_context_injection,
)
.await;
attempt
.track(
sess.as_ref(),
compaction_status_from_result(&result),
result.as_ref().err().map(ToString::to_string),
)
.await;
if let Err(err) = result {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
sess.send_event(turn_context, event).await;
return Err(err);
}
Ok(())
}
async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
client_session: Option<&mut ModelClientSession>,
initial_context_injection: InitialContextInjection,
) -> CodexResult<()> {
let context_compaction_item = ContextCompactionItem::new();
let compaction_trace = sess.services.rollout_thread_trace.compaction_trace_context(
turn_context.sub_id.as_str(),
context_compaction_item.id.as_str(),
turn_context.model_info.slug.as_str(),
turn_context.provider.info().name.as_str(),
);
let compaction_item = TurnItem::ContextCompaction(context_compaction_item);
sess.emit_turn_item_started(turn_context, &compaction_item)
.await;
let mut history = sess.clone_history().await;
let base_instructions = sess.get_base_instructions().await;
let deleted_items = trim_function_call_history_to_fit_context_window(
&mut history,
turn_context.as_ref(),
&base_instructions,
);
if deleted_items > 0 {
info!(
turn_id = %turn_context.sub_id,
deleted_items,
"trimmed history items before remote compaction v2"
);
}
let trace_input_history = history.raw_items().to_vec();
let prompt_input = history.for_prompt(&turn_context.model_info.input_modalities);
let tool_router = built_tools(
sess.as_ref(),
turn_context.as_ref(),
&prompt_input,
&HashSet::new(),
/*skills_outcome*/ None,
&CancellationToken::new(),
)
.await?;
let mut input = prompt_input.clone();
input.push(ResponseItem::ContextCompaction {
encrypted_content: None,
});
let prompt = Prompt {
input,
tools: tool_router.model_visible_specs(),
parallel_tool_calls: turn_context.model_info.supports_parallel_tool_calls,
base_instructions,
personality: turn_context.personality,
output_schema: None,
output_schema_strict: true,
};
let turn_metadata_header = turn_context.turn_metadata_state.current_header_value();
let trace_attempt = compaction_trace.start_attempt(&serde_json::json!({
"model": turn_context.model_info.slug.as_str(),
"instructions": prompt.base_instructions.text.as_str(),
"input": &prompt.input,
"parallel_tool_calls": prompt.parallel_tool_calls,
}));
let compaction_output_result = if let Some(client_session) = client_session {
run_remote_compaction_request_v2(
sess,
turn_context,
client_session,
&prompt,
turn_metadata_header.as_deref(),
)
.await
} else {
let mut owned_client_session = sess.services.model_client.new_session();
run_remote_compaction_request_v2(
sess,
turn_context,
&mut owned_client_session,
&prompt,
turn_metadata_header.as_deref(),
)
.await
};
trace_attempt.record_result(compaction_output_result.as_ref().map(std::slice::from_ref));
let compaction_output = compaction_output_result?;
let compacted_history = build_v2_compacted_history(&prompt_input, compaction_output);
let new_history = process_compacted_history(
sess.as_ref(),
turn_context.as_ref(),
compacted_history,
initial_context_injection,
)
.await;
let reference_context_item = match initial_context_injection {
InitialContextInjection::DoNotInject => None,
InitialContextInjection::BeforeLastUserMessage => Some(turn_context.to_turn_context_item()),
};
let compacted_item = CompactedItem {
message: String::new(),
replacement_history: Some(new_history.clone()),
};
compaction_trace.record_installed(&CompactionCheckpointTracePayload {
input_history: &trace_input_history,
replacement_history: &new_history,
});
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
.await;
sess.recompute_token_usage(turn_context).await;
sess.emit_turn_item_completed(turn_context, compaction_item)
.await;
Ok(())
}
async fn run_remote_compaction_request_v2(
sess: &Session,
turn_context: &TurnContext,
client_session: &mut ModelClientSession,
prompt: &Prompt,
turn_metadata_header: Option<&str>,
) -> CodexResult<ResponseItem> {
let stream = client_session
.stream(
prompt,
&turn_context.model_info,
&turn_context.session_telemetry,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
turn_context.config.service_tier,
turn_metadata_header,
&InferenceTraceContext::disabled(),
)
.or_else(|err| async {
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
let compact_request_log_data =
build_compact_request_log_data(&prompt.input, &prompt.base_instructions.text);
log_remote_compact_failure(
turn_context,
&compact_request_log_data,
total_usage_breakdown,
&err,
);
Err(err)
})
.await?;
collect_context_compaction_output(stream).await
}
async fn collect_context_compaction_output(
mut stream: ResponseStream,
) -> CodexResult<ResponseItem> {
let mut output_item_count = 0usize;
let mut context_compaction_count = 0usize;
let mut context_compaction_output = None;
let mut completed = false;
while let Some(event) = stream.next().await {
match event? {
ResponseEvent::OutputItemDone(item) => {
output_item_count += 1;
match item {
ResponseItem::ContextCompaction {
encrypted_content: Some(_),
} => {
context_compaction_count += 1;
if context_compaction_output.is_none() {
context_compaction_output = Some(item);
}
}
ResponseItem::ContextCompaction {
encrypted_content: None,
} => {
return Err(CodexErr::Fatal(
"remote compaction v2 returned context_compaction without encrypted_content"
.to_string(),
));
}
_ => {}
}
}
ResponseEvent::Completed { .. } => {
completed = true;
break;
}
_ => {}
}
}
if !completed {
return Err(CodexErr::Fatal(
"remote compaction v2 stream closed before response.completed".to_string(),
));
}
if context_compaction_count != 1 {
return Err(CodexErr::Fatal(format!(
"remote compaction v2 expected exactly one context_compaction output item, got {context_compaction_count} from {output_item_count} output items"
)));
}
let Some(context_compaction_output) = context_compaction_output else {
unreachable!("context compaction output must exist when count is exactly one");
};
Ok(context_compaction_output)
}
fn build_v2_compacted_history(
prompt_input: &[ResponseItem],
compaction_output: ResponseItem,
) -> Vec<ResponseItem> {
let mut retained = prompt_input
.iter()
.filter(|item| is_retained_for_remote_compaction_v2(item))
.cloned()
.collect::<Vec<_>>();
retained.push(compaction_output);
retained
}
fn is_retained_for_remote_compaction_v2(item: &ResponseItem) -> bool {
let ResponseItem::Message { role, .. } = item else {
return false;
};
matches!(role.as_str(), "user" | "developer" | "system")
}
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::models::ContentItem;
use codex_protocol::models::MessagePhase;
use pretty_assertions::assert_eq;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
fn message(role: &str, text: &str, phase: Option<MessagePhase>) -> ResponseItem {
ResponseItem::Message {
id: None,
role: role.to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
phase,
}
}
fn response_stream(events: Vec<CodexResult<ResponseEvent>>) -> ResponseStream {
let (tx_event, rx_event) = mpsc::channel(events.len().max(1));
for event in events {
tx_event
.try_send(event)
.expect("response stream test channel should have capacity");
}
drop(tx_event);
ResponseStream {
rx_event,
consumer_dropped: CancellationToken::new(),
}
}
#[test]
fn build_v2_compacted_history_matches_prod_retention_shape() {
let input = vec![
message("developer", "dev", /*phase*/ None),
message("system", "sys", /*phase*/ None),
message("user", "user", /*phase*/ None),
message("assistant", "commentary", Some(MessagePhase::Commentary)),
message("assistant", "final", Some(MessagePhase::FinalAnswer)),
ResponseItem::FunctionCall {
id: None,
name: "shell".to_string(),
namespace: None,
arguments: "{}".to_string(),
call_id: "call_1".to_string(),
},
ResponseItem::Compaction {
encrypted_content: "old".to_string(),
},
];
let output = ResponseItem::ContextCompaction {
encrypted_content: Some("new".to_string()),
};
let history = build_v2_compacted_history(&input, output.clone());
assert_eq!(
history,
vec![
message("developer", "dev", /*phase*/ None),
message("system", "sys", /*phase*/ None),
message("user", "user", /*phase*/ None),
output,
]
);
}
#[tokio::test]
async fn collect_context_compaction_output_accepts_additional_output_items() {
let context_compaction = ResponseItem::ContextCompaction {
encrypted_content: Some("encrypted".to_string()),
};
let stream = response_stream(vec![
Ok(ResponseEvent::OutputItemDone(message(
"assistant",
"IGNORED_COMPACT_REPLY",
Some(MessagePhase::FinalAnswer),
))),
Ok(ResponseEvent::OutputItemDone(context_compaction.clone())),
Ok(ResponseEvent::Completed {
response_id: "resp-compact".to_string(),
token_usage: None,
end_turn: Some(true),
}),
]);
let output = collect_context_compaction_output(stream)
.await
.expect("context compaction should be collected");
assert_eq!(output, context_compaction);
}
}
+8 -2
View File
@@ -400,6 +400,7 @@ impl ContextManager {
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::Compaction { .. }
| ResponseItem::ContextCompaction { .. }
| ResponseItem::Other => item.clone(),
}
}
@@ -487,7 +488,8 @@ fn is_api_message(message: &ResponseItem) -> bool {
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::Compaction { .. } => true,
| ResponseItem::Compaction { .. }
| ResponseItem::ContextCompaction { .. } => true,
ResponseItem::Other => false,
}
}
@@ -535,6 +537,9 @@ pub(crate) fn estimate_response_item_model_visible_bytes(item: &ResponseItem) ->
}
| ResponseItem::Compaction {
encrypted_content: content,
}
| ResponseItem::ContextCompaction {
encrypted_content: Some(content),
} => i64::try_from(estimate_reasoning_length(content.len())).unwrap_or(i64::MAX),
item => {
let raw = serde_json::to_string(item)
@@ -681,7 +686,8 @@ fn is_model_generated_item(item: &ResponseItem) -> bool {
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::Compaction { .. } => true,
| ResponseItem::Compaction { .. }
| ResponseItem::ContextCompaction { .. } => true,
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::CustomToolCallOutput { .. }
+1
View File
@@ -17,6 +17,7 @@ pub(crate) mod session;
pub use session::SteerInputError;
mod codex_thread;
mod compact_remote;
mod compact_remote_v2;
mod config_lock;
pub use codex_thread::CodexThread;
pub use codex_thread::CodexThreadTurnContextOverrides;
+4 -3
View File
@@ -875,9 +875,10 @@ impl Session {
let beta_features_header = FEATURES
.iter()
.filter_map(|spec| {
if spec.stage.experimental_menu_description().is_some()
&& config.features.enabled(spec.id)
{
let advertise_in_model_client_header =
spec.stage.experimental_menu_description().is_some()
|| spec.id == Feature::RemoteCompactionV2;
if advertise_in_model_client_header && config.features.enabled(spec.id) {
Some(spec.key)
} else {
None
+51 -21
View File
@@ -16,6 +16,7 @@ use crate::compact::collect_user_messages;
use crate::compact::run_inline_auto_compact_task;
use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::compact_remote_v2::run_inline_remote_auto_compact_task as run_inline_remote_auto_compact_task_v2;
use crate::connectors;
use crate::context::ContextualUserFragment;
use crate::feedback_tags;
@@ -147,19 +148,21 @@ pub(crate) async fn run_turn(
let model_info = turn_context.model_info.clone();
let auto_compact_limit = model_info.auto_compact_token_limit().unwrap_or(i64::MAX);
let mut prewarmed_client_session = prewarmed_client_session;
let mut client_session =
prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session());
// TODO(ccunningham): Pre-turn compaction runs before context updates and the
// new user message are recorded. Estimate pending incoming items (context
// diffs/full reinjection + user input) and trigger compaction preemptively
// when they would push the thread over the compaction threshold.
let pre_sampling_compacted = match run_pre_sampling_compact(&sess, &turn_context).await {
Ok(pre_sampling_compacted) => pre_sampling_compacted,
Err(_) => {
error!("Failed to run pre-sampling compact");
return None;
}
};
if pre_sampling_compacted && let Some(mut client_session) = prewarmed_client_session.take() {
let pre_sampling_compact =
match run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await {
Ok(pre_sampling_compact) => pre_sampling_compact,
Err(_) => {
error!("Failed to run pre-sampling compact");
return None;
}
};
if pre_sampling_compact.reset_client_session {
client_session.reset_websocket_session();
}
@@ -365,8 +368,6 @@ pub(crate) async fn run_turn(
// `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse
// one instance across retries within this turn.
let mut client_session =
prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session());
// Pending input is drained into history before building the next model request.
// However, we defer that drain until after sampling in two cases:
// 1. At the start of a turn, so the fresh user prompt in `input` gets sampled first.
@@ -484,19 +485,22 @@ pub(crate) async fn run_turn(
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
if run_auto_compact(
let reset_client_session = match run_auto_compact(
&sess,
&turn_context,
&mut client_session,
InitialContextInjection::BeforeLastUserMessage,
CompactionReason::ContextLimit,
CompactionPhase::MidTurn,
)
.await
.is_err()
{
return None;
Ok(reset_client_session) => reset_client_session,
Err(_) => return None,
};
if reset_client_session {
client_session.reset_websocket_session();
}
client_session.reset_websocket_session();
can_drain_pending_input = !model_needs_follow_up;
continue;
}
@@ -707,17 +711,24 @@ async fn track_turn_resolved_config_analytics(
});
}
struct PreSamplingCompactResult {
reset_client_session: bool,
}
async fn run_pre_sampling_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<bool> {
client_session: &mut ModelClientSession,
) -> CodexResult<PreSamplingCompactResult> {
let total_usage_tokens_before_compaction = sess.get_total_token_usage().await;
let mut pre_sampling_compacted = maybe_run_previous_model_inline_compact(
sess,
turn_context,
client_session,
total_usage_tokens_before_compaction,
)
.await?;
let mut reset_client_session = pre_sampling_compacted;
let total_usage_tokens = sess.get_total_token_usage().await;
let auto_compact_limit = turn_context
.model_info
@@ -725,9 +736,10 @@ async fn run_pre_sampling_compact(
.unwrap_or(i64::MAX);
// Compact if the total usage tokens are greater than the auto compact limit
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(
reset_client_session |= run_auto_compact(
sess,
turn_context,
client_session,
InitialContextInjection::DoNotInject,
CompactionReason::ContextLimit,
CompactionPhase::PreTurn,
@@ -735,7 +747,9 @@ async fn run_pre_sampling_compact(
.await?;
pre_sampling_compacted = true;
}
Ok(pre_sampling_compacted)
Ok(PreSamplingCompactResult {
reset_client_session: pre_sampling_compacted && reset_client_session,
})
}
/// Runs pre-sampling compaction against the previous model when switching to a smaller
@@ -747,6 +761,7 @@ async fn run_pre_sampling_compact(
async fn maybe_run_previous_model_inline_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
client_session: &mut ModelClientSession,
total_usage_tokens: i64,
) -> CodexResult<bool> {
let Some(previous_turn_settings) = sess.previous_turn_settings().await else {
@@ -772,9 +787,10 @@ async fn maybe_run_previous_model_inline_compact(
&& previous_model_turn_context.model_info.slug != turn_context.model_info.slug
&& old_context_window > new_context_window;
if should_run {
run_auto_compact(
let _ = run_auto_compact(
sess,
&previous_model_turn_context,
client_session,
InitialContextInjection::DoNotInject,
CompactionReason::ModelDownshift,
CompactionPhase::PreTurn,
@@ -788,11 +804,24 @@ async fn maybe_run_previous_model_inline_compact(
async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
client_session: &mut ModelClientSession,
initial_context_injection: InitialContextInjection,
reason: CompactionReason,
phase: CompactionPhase,
) -> CodexResult<()> {
) -> CodexResult<bool> {
if should_use_remote_compact_task(turn_context.provider.info()) {
if turn_context.features.enabled(Feature::RemoteCompactionV2) {
run_inline_remote_auto_compact_task_v2(
Arc::clone(sess),
Arc::clone(turn_context),
client_session,
initial_context_injection,
reason,
phase,
)
.await?;
return Ok(false);
}
run_inline_remote_auto_compact_task(
Arc::clone(sess),
Arc::clone(turn_context),
@@ -811,7 +840,7 @@ async fn run_auto_compact(
)
.await?;
}
Ok(())
Ok(true)
}
pub(super) fn collect_explicit_app_ids_from_skill_items(
@@ -1958,6 +1987,7 @@ async fn try_run_sampling_request(
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::Compaction { .. }
| ResponseItem::ContextCompaction { .. }
| ResponseItem::Other => false,
};
+8 -1
View File
@@ -33,7 +33,14 @@ impl SessionTask for CompactTask {
/*inc*/ 1,
&[("type", "remote")],
);
crate::compact_remote::run_remote_compact_task(session.clone(), ctx).await
if ctx
.features
.enabled(codex_features::Feature::RemoteCompactionV2)
{
crate::compact_remote_v2::run_remote_compact_task(session.clone(), ctx).await
} else {
crate::compact_remote::run_remote_compact_task(session.clone(), ctx).await
}
} else {
session.services.session_telemetry.counter(
"codex.task.compact",
+2 -1
View File
@@ -186,7 +186,8 @@ fn response_item_records_turn_ttft(item: &ResponseItem) -> bool {
| ResponseItem::ToolSearchCall { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::Compaction { .. } => true,
| ResponseItem::Compaction { .. }
| ResponseItem::ContextCompaction { .. } => true,
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::ToolSearchOutput { .. }
+198
View File
@@ -5,6 +5,7 @@ use std::path::PathBuf;
use anyhow::Result;
use codex_core::compact::SUMMARY_PREFIX;
use codex_features::Feature;
use codex_login::CodexAuth;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::items::TurnItem;
@@ -411,6 +412,203 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_v2_reuses_context_compaction_for_followups() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
let _ = config.features.enable(Feature::RemoteCompactionV2);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "context_compaction",
"encrypted_content": "ENCRYPTED_CONTEXT_COMPACTION_SUMMARY",
}
}),
responses::ev_completed("resp-compact"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_turn_complete(&codex).await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "after compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_turn_complete(&codex).await;
let response_requests = responses_mock.requests();
let compact_request = &response_requests[1];
assert!(
compact_request
.header("x-codex-beta-features")
.as_deref()
.is_some_and(|value| value
.split(',')
.any(|feature| feature == "remote_compaction_v2")),
"expected compact request to advertise the remote_compaction_v2 beta feature"
);
assert_eq!(compact_request.path(), "/v1/responses");
let compact_body = compact_request.body_json().to_string();
assert!(
compact_body.contains("\"type\":\"context_compaction\""),
"expected v2 compaction request to include the context_compaction trigger item"
);
assert!(
!compact_body.contains("ENCRYPTED_CONTEXT_COMPACTION_SUMMARY"),
"expected v2 compaction trigger item to omit encrypted_content"
);
let follow_up_request = response_requests.last().expect("follow-up request missing");
let follow_up_body = follow_up_request.body_json().to_string();
assert!(
follow_up_body.contains("\"type\":\"context_compaction\""),
"expected follow-up request to preserve the v2 context_compaction item"
);
assert!(
follow_up_body.contains("ENCRYPTED_CONTEXT_COMPACTION_SUMMARY"),
"expected follow-up request to include the context compaction payload"
);
assert!(
follow_up_body.contains("hello remote compact"),
"expected v2 follow-up request to preserve retained original user messages"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_v2_accepts_additional_output_items_before_context_compaction() -> Result<()>
{
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
let _ = config.features.enable(Feature::RemoteCompactionV2);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_assistant_message("m-compact-noise", "IGNORED_COMPACT_REPLY"),
serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "context_compaction",
"encrypted_content": "ENCRYPTED_CONTEXT_COMPACTION_SUMMARY",
}
}),
responses::ev_completed("resp-compact"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_turn_complete(&codex).await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "after compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_turn_complete(&codex).await;
let response_requests = responses_mock.requests();
let follow_up_request = response_requests.last().expect("follow-up request missing");
let follow_up_body = follow_up_request.body_json().to_string();
assert!(
follow_up_body.contains("\"type\":\"context_compaction\""),
"expected follow-up request to preserve the v2 context_compaction item"
);
assert!(
follow_up_body.contains("ENCRYPTED_CONTEXT_COMPACTION_SUMMARY"),
"expected follow-up request to include the context compaction payload"
);
assert!(
!follow_up_body.contains("IGNORED_COMPACT_REPLY"),
"expected follow-up request to ignore unrelated output items from the compaction stream"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_filters_deferred_dynamic_tools() -> Result<()> {
skip_if_no_network!(Ok(()));
+8
View File
@@ -227,6 +227,8 @@ pub enum Feature {
ResponsesWebsockets,
/// Legacy rollout flag for Responses API WebSocket transport v2 experiments.
ResponsesWebsocketsV2,
/// Enable remote compaction v2 over the normal Responses API.
RemoteCompactionV2,
/// Enable workspace dependency support.
WorkspaceDependencies,
}
@@ -1121,6 +1123,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Removed,
default_enabled: false,
},
FeatureSpec {
id: Feature::RemoteCompactionV2,
key: "remote_compaction_v2",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::WorkspaceDependencies,
key: "workspace_dependencies",
+10
View File
@@ -119,6 +119,16 @@ fn request_permissions_tool_is_under_development() {
assert_eq!(Feature::RequestPermissionsTool.default_enabled(), false);
}
#[test]
fn remote_compaction_v2_is_under_development() {
assert_eq!(Feature::RemoteCompactionV2.stage(), Stage::UnderDevelopment);
assert_eq!(Feature::RemoteCompactionV2.default_enabled(), false);
assert_eq!(
feature_for_key("remote_compaction_v2"),
Some(Feature::RemoteCompactionV2)
);
}
#[test]
fn terminal_resize_reflow_is_experimental_and_enabled_by_default() {
assert_eq!(
@@ -1096,6 +1096,7 @@ impl SessionTelemetry {
ResponseItem::WebSearchCall { .. } => "web_search_call".into(),
ResponseItem::ImageGenerationCall { .. } => "image_generation_call".into(),
ResponseItem::Compaction { .. } => "compaction".into(),
ResponseItem::ContextCompaction { .. } => "context_compaction".into(),
ResponseItem::Other => "other".into(),
}
}
+35
View File
@@ -881,6 +881,11 @@ pub enum ResponseItem {
},
#[serde(alias = "compaction_summary")]
Compaction { encrypted_content: String },
ContextCompaction {
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
encrypted_content: Option<String>,
},
#[serde(other)]
Other,
}
@@ -2403,6 +2408,36 @@ mod tests {
Ok(())
}
#[test]
fn deserializes_context_compaction() -> Result<()> {
let json = r#"{"type":"context_compaction","encrypted_content":"abc"}"#;
let item: ResponseItem = serde_json::from_str(json)?;
assert_eq!(
item,
ResponseItem::ContextCompaction {
encrypted_content: Some("abc".into()),
}
);
Ok(())
}
#[test]
fn serializes_context_compaction_trigger_without_payload() -> Result<()> {
let item = ResponseItem::ContextCompaction {
encrypted_content: None,
};
assert_eq!(
serde_json::to_value(item)?,
serde_json::json!({
"type": "context_compaction",
})
);
Ok(())
}
#[test]
fn deserializes_legacy_ghost_snapshot_as_other() -> Result<()> {
let json = r#"{
@@ -121,13 +121,15 @@ fn normalize_model_item(
.and_then(Value::as_str)
.map(ToString::to_string),
}),
"compaction" | "compaction_summary" => Ok(NormalizedConversationItem {
role: ConversationRole::Assistant,
channel: Some(ConversationChannel::Summary),
kind: ConversationItemKind::Message,
body: compaction_body(item, raw_payload)?,
call_id: None,
}),
"compaction" | "compaction_summary" | "context_compaction" => {
Ok(NormalizedConversationItem {
role: ConversationRole::Assistant,
channel: Some(ConversationChannel::Summary),
kind: ConversationItemKind::Message,
body: compaction_body(item, raw_payload)?,
call_id: None,
})
}
_ => bail!(
"unsupported model item type {item_type} in payload {}",
raw_payload.raw_payload_id
@@ -786,6 +786,75 @@ fn compaction_boundary_repeats_prefix_and_reuses_replacement_items() -> anyhow::
Ok(())
}
#[test]
fn context_compaction_boundary_repeats_prefix_and_reuses_replacement_items() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let writer = create_started_writer(&temp)?;
start_turn(&writer, "turn-1")?;
let developer = message("developer", "follow repo rules");
let user = message("user", "count files");
let request = writer.write_json_payload(
RawPayloadKind::InferenceRequest,
&json!({
"input": [developer, user]
}),
)?;
append_inference_start(&writer, "inference-1", "turn-1", request)?;
let summary = message("user", "summary from compacted history");
let compaction_summary = json!({
"type": "context_compaction",
"encrypted_content": "encrypted-summary",
});
let checkpoint = writer.write_json_payload(
RawPayloadKind::CompactionCheckpoint,
&json!({
"input_history": [developer, user],
"replacement_history": [user, summary, compaction_summary]
}),
)?;
writer.append_with_context(
trace_context("turn-1"),
RawTraceEventPayload::CompactionInstalled {
compaction_id: "compaction-1".to_string(),
checkpoint_payload: checkpoint,
},
)?;
start_turn(&writer, "turn-2")?;
let post_compaction_request = writer.write_json_payload(
RawPayloadKind::InferenceRequest,
&json!({
"input": [developer, user, summary, compaction_summary]
}),
)?;
append_inference_start(&writer, "inference-2", "turn-2", post_compaction_request)?;
let rollout = replay_bundle(temp.path())?;
let compaction = &rollout.compactions["compaction-1"];
assert_eq!(
rollout.conversation_items[&compaction.replacement_item_ids[2]].channel,
Some(ConversationChannel::Summary),
);
assert_eq!(
rollout.conversation_items[&compaction.replacement_item_ids[2]].kind,
ConversationItemKind::Message,
);
assert_eq!(
rollout.conversation_items[&compaction.replacement_item_ids[2]]
.body
.parts,
vec![ConversationPart::Encoded {
label: "encrypted_content".to_string(),
value: "encrypted-summary".to_string(),
}],
);
Ok(())
}
#[test]
fn tool_call_links_model_call_and_followup_output_items() -> anyhow::Result<()> {
let temp = TempDir::new()?;
+3 -1
View File
@@ -37,7 +37,8 @@ pub fn should_persist_response_item(item: &ResponseItem) -> bool {
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::Compaction { .. } => true,
| ResponseItem::Compaction { .. }
| ResponseItem::ContextCompaction { .. } => true,
ResponseItem::Other => false,
}
}
@@ -58,6 +59,7 @@ pub fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool {
ResponseItem::Reasoning { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::Compaction { .. }
| ResponseItem::ContextCompaction { .. }
| ResponseItem::Other => false,
}
}