Rewrite oversized tool outputs during remote compaction (#26251)

## Why

When trying to fit history under compaction limit rewrite output items
instead of removing them entirely. Otherwise we're breaking
incrementality in relation to the previous response.
This commit is contained in:
pakrym-oai
2026-06-03 15:25:50 -07:00
committed by GitHub
Unverified
parent 11bceb8f8b
commit 4231472c03
6 changed files with 330 additions and 89 deletions
+69 -23
View File
@@ -9,7 +9,6 @@ use crate::compact::insert_initial_context_before_last_real_user_or_summary;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::context_manager::estimate_response_item_model_visible_bytes;
use crate::context_manager::is_codex_generated_item;
use crate::hook_runtime::PostCompactHookOutcome;
use crate::hook_runtime::PreCompactHookOutcome;
use crate::hook_runtime::run_post_compact_hooks;
@@ -28,6 +27,8 @@ use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::EventMsg;
@@ -38,6 +39,9 @@ use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing::info;
const CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE: &str =
"Output exceeded the available model context and was truncated";
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
@@ -168,21 +172,21 @@ async fn run_remote_compact_task_inner_impl(
.await;
let mut history = sess.clone_history().await;
let base_instructions = sess.get_base_instructions().await;
let deleted_items = trim_function_call_history_to_fit_context_window(
let rewritten_outputs = trim_function_call_history_to_fit_context_window(
&mut history,
turn_context.as_ref(),
&base_instructions,
);
if deleted_items > 0 {
if rewritten_outputs > 0 {
info!(
turn_id = %turn_context.sub_id,
deleted_items,
"trimmed history items before remote compaction"
rewritten_outputs,
"rewrote history outputs before remote compaction"
);
}
// This is the history selected for remote compaction, after any trimming required to fit the
// compact endpoint. The checkpoint below records it separately from the next sampling request,
// whose prompt will repeat current developer/context prefix items.
// This is the history selected for remote compaction, after any output rewriting required to
// fit the compact endpoint. The checkpoint below records it separately from the next sampling
// request, whose prompt will repeat current developer/context prefix items.
let trace_input_history = history.raw_items().to_vec();
let prompt_input = history.for_prompt(&turn_context.model_info.input_modalities);
let tool_router = built_tools(
@@ -379,26 +383,68 @@ pub(crate) fn trim_function_call_history_to_fit_context_window(
turn_context: &TurnContext,
base_instructions: &BaseInstructions,
) -> usize {
let mut deleted_items = 0usize;
let Some(context_window) = turn_context.model_context_window() else {
return deleted_items;
return 0;
};
let mut rewritten_outputs = 0usize;
let item_count = history.raw_items().len();
while history
.estimate_token_count_with_base_instructions(base_instructions)
.is_some_and(|estimated_tokens| estimated_tokens > context_window)
{
let Some(last_item) = history.raw_items().last() else {
for index in (0..item_count).rev() {
if history
.estimate_token_count_with_base_instructions(base_instructions)
.is_none_or(|estimated_tokens| estimated_tokens <= context_window)
{
break;
}
let Some(rewritten_item) = history
.raw_items()
.get(index)
.and_then(rewritten_output_for_context_window)
else {
break;
};
if !is_codex_generated_item(last_item) {
break;
}
if !history.remove_last_item() {
break;
}
deleted_items += 1;
let mut items = history.raw_items().to_vec();
items[index] = rewritten_item;
history.replace(items);
rewritten_outputs += 1;
}
deleted_items
rewritten_outputs
}
fn rewritten_output_for_context_window(item: &ResponseItem) -> Option<ResponseItem> {
Some(match item {
ResponseItem::FunctionCallOutput { call_id, output } => ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: truncated_output_payload(output),
},
ResponseItem::CustomToolCallOutput {
call_id,
name,
output,
} => ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
name: name.clone(),
output: truncated_output_payload(output),
},
ResponseItem::ToolSearchOutput {
call_id,
status,
execution,
..
} => ResponseItem::ToolSearchOutput {
call_id: call_id.clone(),
status: status.clone(),
execution: execution.clone(),
tools: Vec::new(),
},
_ => return None,
})
}
fn truncated_output_payload(output: &FunctionCallOutputPayload) -> FunctionCallOutputPayload {
FunctionCallOutputPayload {
body: FunctionCallOutputBody::Text(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
success: output.success,
}
}
+4 -4
View File
@@ -185,16 +185,16 @@ async fn run_remote_compact_task_inner_impl(
let mut history = sess.clone_history().await;
let base_instructions = sess.get_base_instructions().await;
let deleted_items = trim_function_call_history_to_fit_context_window(
let rewritten_outputs = trim_function_call_history_to_fit_context_window(
&mut history,
turn_context.as_ref(),
&base_instructions,
);
if deleted_items > 0 {
if rewritten_outputs > 0 {
info!(
turn_id = %turn_context.sub_id,
deleted_items,
"trimmed history items before remote compaction v2"
rewritten_outputs,
"rewrote history outputs before remote compaction v2"
);
}
@@ -174,16 +174,6 @@ impl ContextManager {
}
}
pub(crate) fn remove_last_item(&mut self) -> bool {
if let Some(removed) = self.items.pop() {
normalize::remove_corresponding_for(&mut self.items, &removed);
self.history_version = self.history_version.saturating_add(1);
true
} else {
false
}
}
pub(crate) fn replace(&mut self, items: Vec<ResponseItem>) {
self.items = items;
self.history_version = self.history_version.saturating_add(1);
@@ -734,15 +724,6 @@ fn is_model_generated_item(item: &ResponseItem) -> bool {
}
}
pub(crate) fn is_codex_generated_item(item: &ResponseItem) -> bool {
matches!(
item,
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::CustomToolCallOutput { .. }
) || matches!(item, ResponseItem::Message { role, .. } if role == "developer")
}
pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool {
let ResponseItem::Message { role, content, .. } = item else {
return false;
@@ -651,28 +651,6 @@ fn remove_first_item_removes_matching_call_for_output() {
assert_eq!(h.raw_items(), vec![]);
}
#[test]
fn remove_last_item_removes_matching_call_for_output() {
let items = vec![
user_msg("before tool call"),
ResponseItem::FunctionCall {
id: None,
name: "do_it".to_string(),
namespace: None,
arguments: "{}".to_string(),
call_id: "call-delete-last".to_string(),
},
ResponseItem::FunctionCallOutput {
call_id: "call-delete-last".to_string(),
output: FunctionCallOutputPayload::from_text("ok".to_string()),
},
];
let mut h = create_history_with_items(items);
assert!(h.remove_last_item());
assert_eq!(h.raw_items(), vec![user_msg("before tool call")]);
}
#[test]
fn replace_last_turn_images_replaces_tool_output_images() {
let items = vec![
-1
View File
@@ -5,6 +5,5 @@ pub(crate) mod updates;
pub(crate) use history::ContextManager;
pub(crate) use history::TotalTokenUsageBreakdown;
pub(crate) use history::estimate_response_item_model_visible_bytes;
pub(crate) use history::is_codex_generated_item;
pub(crate) use history::is_user_turn_boundary;
pub(crate) use history::truncate_function_output_payload;
+257 -20
View File
@@ -24,6 +24,7 @@ use codex_protocol::protocol::RealtimeOutputModality;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::user_input::UserInput;
use core_test_support::apps_test_server::configure_search_capable_model;
use core_test_support::context_snapshot;
use core_test_support::context_snapshot::ContextSnapshotOptions;
use core_test_support::context_snapshot::ContextSnapshotRenderMode;
@@ -44,6 +45,9 @@ use serde_json::json;
use tokio::time::Duration;
use wiremock::ResponseTemplate;
const CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE: &str =
"Output exceeded the available model context and was truncated";
fn approx_token_count(text: &str) -> i64 {
i64::try_from(text.len().saturating_add(3) / 4).unwrap_or(i64::MAX)
}
@@ -1449,22 +1453,146 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R
"expected compact request to keep the older function call/result pair"
);
assert!(
!compact_request.has_function_call(trimmed_call_id)
&& compact_request
.function_call_output_text(trimmed_call_id)
.is_none(),
"expected compact request to drop the trailing function call/result pair past the boundary"
compact_request.has_function_call(trimmed_call_id),
"expected compact request to retain the trailing function call"
);
assert_eq!(
compact_request.function_call_output_text(trimmed_call_id),
Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
"expected compact request to rewrite the trailing function call output past the boundary"
);
assert_eq!(
compact_request.inputs_of_type("function_call").len(),
1,
"expected exactly one function call after trimming"
2,
"expected both function calls after rewriting the trailing output"
);
assert_eq!(
compact_request.inputs_of_type("function_call_output").len(),
1,
"expected exactly one function call output after trimming"
2,
"expected both function call outputs after rewriting the trailing output"
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_rewrites_multiple_trailing_function_call_outputs() -> Result<()> {
skip_if_no_network!(Ok(()));
let first_user_message = "turn with retained shell call";
let second_user_message = "turn with parallel shell calls";
let retained_call_id = "retained-call";
let first_trimmed_call_id = "first-trimmed-call";
let second_trimmed_call_id = "second-trimmed-call";
let retained_command = "echo retained-shell-output";
let first_trimmed_command = "yes x | head -n 3000";
let second_trimmed_command = "yes y | head -n 3000";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_context_window = Some(2_000);
config.model_auto_compact_token_limit = Some(200_000);
}),
)
.await?;
let codex = harness.test().codex.clone();
responses::mount_sse_sequence(
harness.server(),
vec![
sse(vec![
responses::ev_shell_command_call(retained_call_id, retained_command),
responses::ev_completed("retained-call-response"),
]),
sse(vec![
responses::ev_assistant_message("retained-assistant", "retained complete"),
responses::ev_completed("retained-final-response"),
]),
sse(vec![
responses::ev_shell_command_call(first_trimmed_call_id, first_trimmed_command),
responses::ev_shell_command_call(second_trimmed_call_id, second_trimmed_command),
responses::ev_completed("parallel-call-response"),
]),
],
)
.await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: first_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: second_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
"REMOTE_COMPACT_SUMMARY",
)
.await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let compact_request = compact_mock.single_request();
assert!(
compact_request.has_function_call(retained_call_id)
&& compact_request
.function_call_output_text(retained_call_id)
.is_some(),
"expected compact request to keep the older function call/result pair"
);
assert!(
compact_request.has_function_call(first_trimmed_call_id)
&& compact_request.has_function_call(second_trimmed_call_id),
"expected compact request to retain both trailing parallel function calls"
);
assert_eq!(
compact_request.function_call_output_text(first_trimmed_call_id),
Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
"expected compact request to rewrite the first trailing function call output"
);
assert_eq!(
compact_request.function_call_output_text(second_trimmed_call_id),
Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
"expected compact request to rewrite the second trailing function call output"
);
assert_eq!(
compact_request.inputs_of_type("function_call").len(),
3,
"expected all function calls after rewriting trailing outputs"
);
assert_eq!(
compact_request.inputs_of_type("function_call_output").len(),
3,
"expected all function call outputs after rewriting trailing outputs"
);
Ok(())
@@ -1600,22 +1728,126 @@ async fn auto_remote_compact_trims_function_call_history_to_fit_context_window()
"expected compact request to keep the older function call/result pair"
);
assert!(
!compact_request.has_function_call(trimmed_call_id)
&& compact_request
.function_call_output_text(trimmed_call_id)
.is_none(),
"expected compact request to drop the trailing function call/result pair past the boundary"
compact_request.has_function_call(trimmed_call_id),
"expected compact request to retain the trailing function call"
);
assert_eq!(
compact_request.function_call_output_text(trimmed_call_id),
Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
"expected compact request to rewrite the trailing function call output past the boundary"
);
assert_eq!(
compact_request.inputs_of_type("function_call").len(),
1,
"expected exactly one function call after trimming"
2,
"expected both function calls after rewriting the trailing output"
);
assert_eq!(
compact_request.inputs_of_type("function_call_output").len(),
1,
"expected exactly one function call output after trimming"
2,
"expected both function call outputs after rewriting the trailing output"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_trims_tool_search_output_to_empty_tools_array() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let search_call_id = "tool-search-1";
let tool_name = "oversized_dynamic_tool";
let tool_description = format!(
"Oversized deferred tool for remote compaction. {}",
"x".repeat(20_000)
);
let _responses_mock = mount_sse_once(
&server,
sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_tool_search_call(
search_call_id,
&json!({
"query": "oversized deferred tool",
"limit": 8,
}),
),
responses::ev_completed("resp-1"),
]),
)
.await;
let input_schema = json!({
"type": "object",
"properties": {
"mode": { "type": "string" },
},
"required": ["mode"],
"additionalProperties": false,
});
let dynamic_tool = DynamicToolSpec {
namespace: Some("codex_app".to_string()),
name: tool_name.to_string(),
description: tool_description,
input_schema,
defer_loading: true,
};
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
configure_search_capable_model(config);
config.model_context_window = Some(2_000);
});
let mut test = builder.build(&server).await?;
let new_thread = test
.thread_manager
.start_thread_with_tools(test.config.clone(), vec![dynamic_tool])
.await?;
test.codex = new_thread.thread;
test.session_configured = new_thread.session_configured;
let codex = test.codex.clone();
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "Find the oversized deferred tool".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
let compact_mock =
responses::mount_compact_user_history_with_summary_once(&server, "REMOTE_COMPACT_SUMMARY")
.await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
let compact_request = compact_mock.single_request();
let compact_tools = compact_request
.tool_search_output(search_call_id)
.get("tools")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
assert!(
compact_request
.inputs_of_type("tool_search_output")
.iter()
.any(|item| item.get("call_id").and_then(Value::as_str) == Some(search_call_id)),
"expected compact request to retain the tool_search_output item"
);
assert!(
compact_tools.is_empty(),
"expected compact request to rewrite trailing tool_search output to an empty tools array"
);
Ok(())
@@ -1936,8 +2168,13 @@ async fn remote_compact_trim_estimate_uses_session_base_instructions() -> Result
"expected remote compact request to preserve older function call history"
);
assert!(
!override_compact_request.has_function_call(override_trailing_call_id),
"expected remote compact request to trim trailing function call history with override instructions"
override_compact_request.has_function_call(override_trailing_call_id),
"expected remote compact request to preserve trailing function call history with override instructions"
);
assert_eq!(
override_compact_request.function_call_output_text(override_trailing_call_id),
Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
"expected remote compact request to rewrite trailing function call output with override instructions"
);
Ok(())