From 4231472c03e6c6e3374b1c19ef096ee28476f9dd Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 3 Jun 2026 15:25:50 -0700 Subject: [PATCH] Rewrite oversized tool outputs during remote compaction (#26251) ## Why When trying to fit history under compaction limit rewrite output items instead of removing them entirely. Otherwise we're breaking incrementality in relation to the previous response. --- codex-rs/core/src/compact_remote.rs | 92 ++++-- codex-rs/core/src/compact_remote_v2.rs | 8 +- codex-rs/core/src/context_manager/history.rs | 19 -- .../core/src/context_manager/history_tests.rs | 22 -- codex-rs/core/src/context_manager/mod.rs | 1 - codex-rs/core/tests/suite/compact_remote.rs | 277 ++++++++++++++++-- 6 files changed, 330 insertions(+), 89 deletions(-) diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index c2ef417b9..cbac86fb9 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -9,7 +9,6 @@ use crate::compact::insert_initial_context_before_last_real_user_or_summary; use crate::context_manager::ContextManager; use crate::context_manager::TotalTokenUsageBreakdown; use crate::context_manager::estimate_response_item_model_visible_bytes; -use crate::context_manager::is_codex_generated_item; use crate::hook_runtime::PostCompactHookOutcome; use crate::hook_runtime::PreCompactHookOutcome; use crate::hook_runtime::run_post_compact_hooks; @@ -28,6 +27,8 @@ use codex_protocol::error::Result as CodexResult; use codex_protocol::items::ContextCompactionItem; use codex_protocol::items::TurnItem; use codex_protocol::models::BaseInstructions; +use codex_protocol::models::FunctionCallOutputBody; +use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::EventMsg; @@ -38,6 +39,9 @@ use tokio_util::sync::CancellationToken; use tracing::error; use tracing::info; +const CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE: &str = + "Output exceeded the available model context and was truncated"; + pub(crate) async fn run_inline_remote_auto_compact_task( sess: Arc, turn_context: Arc, @@ -168,21 +172,21 @@ async fn run_remote_compact_task_inner_impl( .await; let mut history = sess.clone_history().await; let base_instructions = sess.get_base_instructions().await; - let deleted_items = trim_function_call_history_to_fit_context_window( + let rewritten_outputs = trim_function_call_history_to_fit_context_window( &mut history, turn_context.as_ref(), &base_instructions, ); - if deleted_items > 0 { + if rewritten_outputs > 0 { info!( turn_id = %turn_context.sub_id, - deleted_items, - "trimmed history items before remote compaction" + rewritten_outputs, + "rewrote history outputs before remote compaction" ); } - // This is the history selected for remote compaction, after any trimming required to fit the - // compact endpoint. The checkpoint below records it separately from the next sampling request, - // whose prompt will repeat current developer/context prefix items. + // This is the history selected for remote compaction, after any output rewriting required to + // fit the compact endpoint. The checkpoint below records it separately from the next sampling + // request, whose prompt will repeat current developer/context prefix items. let trace_input_history = history.raw_items().to_vec(); let prompt_input = history.for_prompt(&turn_context.model_info.input_modalities); let tool_router = built_tools( @@ -379,26 +383,68 @@ pub(crate) fn trim_function_call_history_to_fit_context_window( turn_context: &TurnContext, base_instructions: &BaseInstructions, ) -> usize { - let mut deleted_items = 0usize; let Some(context_window) = turn_context.model_context_window() else { - return deleted_items; + return 0; }; + let mut rewritten_outputs = 0usize; + let item_count = history.raw_items().len(); - while history - .estimate_token_count_with_base_instructions(base_instructions) - .is_some_and(|estimated_tokens| estimated_tokens > context_window) - { - let Some(last_item) = history.raw_items().last() else { + for index in (0..item_count).rev() { + if history + .estimate_token_count_with_base_instructions(base_instructions) + .is_none_or(|estimated_tokens| estimated_tokens <= context_window) + { + break; + } + let Some(rewritten_item) = history + .raw_items() + .get(index) + .and_then(rewritten_output_for_context_window) + else { break; }; - if !is_codex_generated_item(last_item) { - break; - } - if !history.remove_last_item() { - break; - } - deleted_items += 1; + let mut items = history.raw_items().to_vec(); + items[index] = rewritten_item; + history.replace(items); + rewritten_outputs += 1; } - deleted_items + rewritten_outputs +} + +fn rewritten_output_for_context_window(item: &ResponseItem) -> Option { + Some(match item { + ResponseItem::FunctionCallOutput { call_id, output } => ResponseItem::FunctionCallOutput { + call_id: call_id.clone(), + output: truncated_output_payload(output), + }, + ResponseItem::CustomToolCallOutput { + call_id, + name, + output, + } => ResponseItem::CustomToolCallOutput { + call_id: call_id.clone(), + name: name.clone(), + output: truncated_output_payload(output), + }, + ResponseItem::ToolSearchOutput { + call_id, + status, + execution, + .. + } => ResponseItem::ToolSearchOutput { + call_id: call_id.clone(), + status: status.clone(), + execution: execution.clone(), + tools: Vec::new(), + }, + _ => return None, + }) +} + +fn truncated_output_payload(output: &FunctionCallOutputPayload) -> FunctionCallOutputPayload { + FunctionCallOutputPayload { + body: FunctionCallOutputBody::Text(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()), + success: output.success, + } } diff --git a/codex-rs/core/src/compact_remote_v2.rs b/codex-rs/core/src/compact_remote_v2.rs index eb10eb211..77ef1629c 100644 --- a/codex-rs/core/src/compact_remote_v2.rs +++ b/codex-rs/core/src/compact_remote_v2.rs @@ -185,16 +185,16 @@ async fn run_remote_compact_task_inner_impl( let mut history = sess.clone_history().await; let base_instructions = sess.get_base_instructions().await; - let deleted_items = trim_function_call_history_to_fit_context_window( + let rewritten_outputs = trim_function_call_history_to_fit_context_window( &mut history, turn_context.as_ref(), &base_instructions, ); - if deleted_items > 0 { + if rewritten_outputs > 0 { info!( turn_id = %turn_context.sub_id, - deleted_items, - "trimmed history items before remote compaction v2" + rewritten_outputs, + "rewrote history outputs before remote compaction v2" ); } diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index e5f6e4132..0be9bbc18 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -174,16 +174,6 @@ impl ContextManager { } } - pub(crate) fn remove_last_item(&mut self) -> bool { - if let Some(removed) = self.items.pop() { - normalize::remove_corresponding_for(&mut self.items, &removed); - self.history_version = self.history_version.saturating_add(1); - true - } else { - false - } - } - pub(crate) fn replace(&mut self, items: Vec) { self.items = items; self.history_version = self.history_version.saturating_add(1); @@ -734,15 +724,6 @@ fn is_model_generated_item(item: &ResponseItem) -> bool { } } -pub(crate) fn is_codex_generated_item(item: &ResponseItem) -> bool { - matches!( - item, - ResponseItem::FunctionCallOutput { .. } - | ResponseItem::ToolSearchOutput { .. } - | ResponseItem::CustomToolCallOutput { .. } - ) || matches!(item, ResponseItem::Message { role, .. } if role == "developer") -} - pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool { let ResponseItem::Message { role, content, .. } = item else { return false; diff --git a/codex-rs/core/src/context_manager/history_tests.rs b/codex-rs/core/src/context_manager/history_tests.rs index 7ee22be44..bb22749cb 100644 --- a/codex-rs/core/src/context_manager/history_tests.rs +++ b/codex-rs/core/src/context_manager/history_tests.rs @@ -651,28 +651,6 @@ fn remove_first_item_removes_matching_call_for_output() { assert_eq!(h.raw_items(), vec![]); } -#[test] -fn remove_last_item_removes_matching_call_for_output() { - let items = vec![ - user_msg("before tool call"), - ResponseItem::FunctionCall { - id: None, - name: "do_it".to_string(), - namespace: None, - arguments: "{}".to_string(), - call_id: "call-delete-last".to_string(), - }, - ResponseItem::FunctionCallOutput { - call_id: "call-delete-last".to_string(), - output: FunctionCallOutputPayload::from_text("ok".to_string()), - }, - ]; - let mut h = create_history_with_items(items); - - assert!(h.remove_last_item()); - assert_eq!(h.raw_items(), vec![user_msg("before tool call")]); -} - #[test] fn replace_last_turn_images_replaces_tool_output_images() { let items = vec![ diff --git a/codex-rs/core/src/context_manager/mod.rs b/codex-rs/core/src/context_manager/mod.rs index 9dd85d1ed..2295c49df 100644 --- a/codex-rs/core/src/context_manager/mod.rs +++ b/codex-rs/core/src/context_manager/mod.rs @@ -5,6 +5,5 @@ pub(crate) mod updates; pub(crate) use history::ContextManager; pub(crate) use history::TotalTokenUsageBreakdown; pub(crate) use history::estimate_response_item_model_visible_bytes; -pub(crate) use history::is_codex_generated_item; pub(crate) use history::is_user_turn_boundary; pub(crate) use history::truncate_function_output_payload; diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index f6a87b6c7..ebb78a017 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -24,6 +24,7 @@ use codex_protocol::protocol::RealtimeOutputModality; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::user_input::UserInput; +use core_test_support::apps_test_server::configure_search_capable_model; use core_test_support::context_snapshot; use core_test_support::context_snapshot::ContextSnapshotOptions; use core_test_support::context_snapshot::ContextSnapshotRenderMode; @@ -44,6 +45,9 @@ use serde_json::json; use tokio::time::Duration; use wiremock::ResponseTemplate; +const CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE: &str = + "Output exceeded the available model context and was truncated"; + fn approx_token_count(text: &str) -> i64 { i64::try_from(text.len().saturating_add(3) / 4).unwrap_or(i64::MAX) } @@ -1449,22 +1453,146 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R "expected compact request to keep the older function call/result pair" ); assert!( - !compact_request.has_function_call(trimmed_call_id) - && compact_request - .function_call_output_text(trimmed_call_id) - .is_none(), - "expected compact request to drop the trailing function call/result pair past the boundary" + compact_request.has_function_call(trimmed_call_id), + "expected compact request to retain the trailing function call" + ); + assert_eq!( + compact_request.function_call_output_text(trimmed_call_id), + Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()), + "expected compact request to rewrite the trailing function call output past the boundary" ); assert_eq!( compact_request.inputs_of_type("function_call").len(), - 1, - "expected exactly one function call after trimming" + 2, + "expected both function calls after rewriting the trailing output" ); assert_eq!( compact_request.inputs_of_type("function_call_output").len(), - 1, - "expected exactly one function call output after trimming" + 2, + "expected both function call outputs after rewriting the trailing output" + ); + + Ok(()) +} + +#[cfg_attr(target_os = "windows", ignore)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_compact_rewrites_multiple_trailing_function_call_outputs() -> Result<()> { + skip_if_no_network!(Ok(())); + + let first_user_message = "turn with retained shell call"; + let second_user_message = "turn with parallel shell calls"; + let retained_call_id = "retained-call"; + let first_trimmed_call_id = "first-trimmed-call"; + let second_trimmed_call_id = "second-trimmed-call"; + let retained_command = "echo retained-shell-output"; + let first_trimmed_command = "yes x | head -n 3000"; + let second_trimmed_command = "yes y | head -n 3000"; + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.model_context_window = Some(2_000); + config.model_auto_compact_token_limit = Some(200_000); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + responses::mount_sse_sequence( + harness.server(), + vec![ + sse(vec![ + responses::ev_shell_command_call(retained_call_id, retained_command), + responses::ev_completed("retained-call-response"), + ]), + sse(vec![ + responses::ev_assistant_message("retained-assistant", "retained complete"), + responses::ev_completed("retained-final-response"), + ]), + sse(vec![ + responses::ev_shell_command_call(first_trimmed_call_id, first_trimmed_command), + responses::ev_shell_command_call(second_trimmed_call_id, second_trimmed_command), + responses::ev_completed("parallel-call-response"), + ]), + ], + ) + .await; + + codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: first_user_message.into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + additional_context: Default::default(), + thread_settings: Default::default(), + }) + .await?; + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: second_user_message.into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + additional_context: Default::default(), + thread_settings: Default::default(), + }) + .await?; + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + let compact_mock = responses::mount_compact_user_history_with_summary_once( + harness.server(), + "REMOTE_COMPACT_SUMMARY", + ) + .await; + + codex.submit(Op::Compact).await?; + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + let compact_request = compact_mock.single_request(); + assert!( + compact_request.has_function_call(retained_call_id) + && compact_request + .function_call_output_text(retained_call_id) + .is_some(), + "expected compact request to keep the older function call/result pair" + ); + assert!( + compact_request.has_function_call(first_trimmed_call_id) + && compact_request.has_function_call(second_trimmed_call_id), + "expected compact request to retain both trailing parallel function calls" + ); + assert_eq!( + compact_request.function_call_output_text(first_trimmed_call_id), + Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()), + "expected compact request to rewrite the first trailing function call output" + ); + assert_eq!( + compact_request.function_call_output_text(second_trimmed_call_id), + Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()), + "expected compact request to rewrite the second trailing function call output" + ); + + assert_eq!( + compact_request.inputs_of_type("function_call").len(), + 3, + "expected all function calls after rewriting trailing outputs" + ); + assert_eq!( + compact_request.inputs_of_type("function_call_output").len(), + 3, + "expected all function call outputs after rewriting trailing outputs" ); Ok(()) @@ -1600,22 +1728,126 @@ async fn auto_remote_compact_trims_function_call_history_to_fit_context_window() "expected compact request to keep the older function call/result pair" ); assert!( - !compact_request.has_function_call(trimmed_call_id) - && compact_request - .function_call_output_text(trimmed_call_id) - .is_none(), - "expected compact request to drop the trailing function call/result pair past the boundary" + compact_request.has_function_call(trimmed_call_id), + "expected compact request to retain the trailing function call" + ); + assert_eq!( + compact_request.function_call_output_text(trimmed_call_id), + Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()), + "expected compact request to rewrite the trailing function call output past the boundary" ); assert_eq!( compact_request.inputs_of_type("function_call").len(), - 1, - "expected exactly one function call after trimming" + 2, + "expected both function calls after rewriting the trailing output" ); assert_eq!( compact_request.inputs_of_type("function_call_output").len(), - 1, - "expected exactly one function call output after trimming" + 2, + "expected both function call outputs after rewriting the trailing output" + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_compact_trims_tool_search_output_to_empty_tools_array() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let search_call_id = "tool-search-1"; + let tool_name = "oversized_dynamic_tool"; + let tool_description = format!( + "Oversized deferred tool for remote compaction. {}", + "x".repeat(20_000) + ); + let _responses_mock = mount_sse_once( + &server, + sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_tool_search_call( + search_call_id, + &json!({ + "query": "oversized deferred tool", + "limit": 8, + }), + ), + responses::ev_completed("resp-1"), + ]), + ) + .await; + + let input_schema = json!({ + "type": "object", + "properties": { + "mode": { "type": "string" }, + }, + "required": ["mode"], + "additionalProperties": false, + }); + let dynamic_tool = DynamicToolSpec { + namespace: Some("codex_app".to_string()), + name: tool_name.to_string(), + description: tool_description, + input_schema, + defer_loading: true, + }; + + let mut builder = test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + configure_search_capable_model(config); + config.model_context_window = Some(2_000); + }); + let mut test = builder.build(&server).await?; + let new_thread = test + .thread_manager + .start_thread_with_tools(test.config.clone(), vec![dynamic_tool]) + .await?; + test.codex = new_thread.thread; + test.session_configured = new_thread.session_configured; + let codex = test.codex.clone(); + + codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: "Find the oversized deferred tool".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + additional_context: Default::default(), + thread_settings: Default::default(), + }) + .await?; + wait_for_turn_complete(&codex).await; + + let compact_mock = + responses::mount_compact_user_history_with_summary_once(&server, "REMOTE_COMPACT_SUMMARY") + .await; + + codex.submit(Op::Compact).await?; + wait_for_turn_complete(&codex).await; + + let compact_request = compact_mock.single_request(); + let compact_tools = compact_request + .tool_search_output(search_call_id) + .get("tools") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(); + assert!( + compact_request + .inputs_of_type("tool_search_output") + .iter() + .any(|item| item.get("call_id").and_then(Value::as_str) == Some(search_call_id)), + "expected compact request to retain the tool_search_output item" + ); + assert!( + compact_tools.is_empty(), + "expected compact request to rewrite trailing tool_search output to an empty tools array" ); Ok(()) @@ -1936,8 +2168,13 @@ async fn remote_compact_trim_estimate_uses_session_base_instructions() -> Result "expected remote compact request to preserve older function call history" ); assert!( - !override_compact_request.has_function_call(override_trailing_call_id), - "expected remote compact request to trim trailing function call history with override instructions" + override_compact_request.has_function_call(override_trailing_call_id), + "expected remote compact request to preserve trailing function call history with override instructions" + ); + assert_eq!( + override_compact_request.function_call_output_text(override_trailing_call_id), + Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()), + "expected remote compact request to rewrite trailing function call output with override instructions" ); Ok(())