diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 7b93195de..7a2eaf935 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -254,8 +254,8 @@ async fn run_compact_task_inner_impl( Ok(()) => { break; } - Err(CodexErr::Interrupted) => { - return Err(CodexErr::Interrupted); + Err(err @ (CodexErr::Interrupted | CodexErr::TurnAborted)) => { + return Err(err); } Err(e @ CodexErr::ContextWindowExceeded) => { if turn_input_len > 1 { @@ -657,7 +657,7 @@ async fn drain_to_completed( } Ok(ResponseEvent::Completed { token_usage, .. }) => { sess.update_token_usage_info(turn_context, token_usage.as_ref()) - .await; + .await?; return Ok(()); } Ok(_) => continue, diff --git a/codex-rs/core/src/compact_remote_v2.rs b/codex-rs/core/src/compact_remote_v2.rs index d941388b0..6f10a474a 100644 --- a/codex-rs/core/src/compact_remote_v2.rs +++ b/codex-rs/core/src/compact_remote_v2.rs @@ -165,6 +165,9 @@ async fn run_remote_compact_task_inner( attempt .track(sess.as_ref(), status, codex_error, analytics_details) .await; + if matches!(&result, Err(CodexErr::TurnAborted)) { + return result; + } if let Err(err) = result { sess.track_turn_codex_error(turn_context, &err); let event = EventMsg::Error( @@ -281,7 +284,7 @@ async fn run_remote_compact_task_inner_impl( token_usage, } = compaction_output_result?; if let Some(token_usage) = token_usage { - sess.record_rollout_budget_usage(&token_usage); + sess.record_rollout_budget_usage(&token_usage)?; analytics_details.active_context_tokens_before = Some(token_usage.input_tokens); analytics_details.compaction_summary_tokens = Some(token_usage.output_tokens); analytics_details.cached_input_tokens = Some(token_usage.cached_input_tokens); diff --git a/codex-rs/core/src/rollout_budget.rs b/codex-rs/core/src/rollout_budget.rs index accabac76..b8901a9e3 100644 --- a/codex-rs/core/src/rollout_budget.rs +++ b/codex-rs/core/src/rollout_budget.rs @@ -40,13 +40,15 @@ impl RolloutBudget { }); } - pub(crate) fn record_usage(&self, usage: &TokenUsage) { + /// Returns true once the configured budget is exhausted, including on later calls. + pub(crate) fn record_usage(&self, usage: &TokenUsage) -> bool { let Some(mut state) = self.lock() else { - return; + return false; }; state.weighted_tokens_used += usage.output_tokens.max(0) as f64 * state.config.sampling_token_weight + usage.non_cached_input() as f64 * state.config.prefill_token_weight; + state.weighted_tokens_used >= state.config.limit_tokens as f64 } pub(crate) fn pending_reminder( diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index c627fe537..3c1f06f6b 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -3410,17 +3410,19 @@ impl Session { &self, turn_context: &TurnContext, token_usage: Option<&TokenUsage>, - ) { - self.record_token_usage_info(turn_context, token_usage) + ) -> CodexResult<()> { + let result = self + .record_token_usage_info(turn_context, token_usage) .await; self.send_token_count_event(turn_context).await; + result } pub(crate) async fn record_token_usage_info( &self, turn_context: &TurnContext, token_usage: Option<&TokenUsage>, - ) { + ) -> CodexResult<()> { if let Some(token_usage) = token_usage { let token_info = { let mut state = self.state.lock().await; @@ -3434,7 +3436,7 @@ impl Session { } state.token_info() }; - self.record_rollout_budget_usage(token_usage); + let budget_result = self.record_rollout_budget_usage(token_usage); if let Some(token_info) = token_info.as_ref() { for contributor in self.services.extensions.token_usage_contributors() { contributor @@ -3447,7 +3449,9 @@ impl Session { .await; } } + budget_result?; } + Ok(()) } pub(crate) async fn recompute_token_usage(&self, turn_context: &TurnContext) { diff --git a/codex-rs/core/src/session/rollout_budget.rs b/codex-rs/core/src/session/rollout_budget.rs index 4627d95ee..fa90cf7b9 100644 --- a/codex-rs/core/src/session/rollout_budget.rs +++ b/codex-rs/core/src/session/rollout_budget.rs @@ -1,6 +1,8 @@ use super::session::Session; use super::turn_context::TurnContext; use crate::context::ContextualUserFragment; +use codex_protocol::error::CodexErr; +use codex_protocol::error::Result as CodexResult; use codex_protocol::protocol::TokenUsage; pub(super) async fn maybe_record_reminder( @@ -21,10 +23,15 @@ pub(super) async fn maybe_record_reminder( } impl Session { - pub(crate) fn record_rollout_budget_usage(&self, usage: &TokenUsage) { - self.services + pub(crate) fn record_rollout_budget_usage(&self, usage: &TokenUsage) -> CodexResult<()> { + if self + .services .agent_control .rollout_budget() - .record_usage(usage); + .record_usage(usage) + { + return Err(CodexErr::TurnAborted); + } + Ok(()) } } diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 6f8215530..bca6be6a7 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -70,6 +70,7 @@ use crate::state::ActiveTurn; use crate::state::TaskKind; use crate::tasks::SessionTask; use crate::tasks::SessionTaskContext; +use crate::tasks::SessionTaskResult; use crate::tasks::UserShellCommandMode; use crate::tasks::execute_user_shell_command; use crate::tools::ToolRouter; @@ -2150,10 +2151,12 @@ async fn record_token_usage_info_notifies_extension_contributors() { session .record_token_usage_info(&turn_context, Some(&first_usage)) - .await; + .await + .expect("first usage should be recorded"); session .record_token_usage_info(&turn_context, Some(&second_usage)) - .await; + .await + .expect("second usage should be recorded"); let mut expected_total_usage = first_usage.clone(); expected_total_usage.add_assign(&second_usage); @@ -6474,13 +6477,13 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() { _ctx: Arc, _input: Vec, _cancellation_token: CancellationToken, - ) -> Option { + ) -> SessionTaskResult { let mut trace = self .captured_trace .lock() .unwrap_or_else(std::sync::PoisonError::into_inner); *trace = current_span_w3c_trace_context(); - None + Ok(None) } } @@ -8676,8 +8679,8 @@ impl SessionTask for CompletingTask { _ctx: Arc, _input: Vec, _cancellation_token: CancellationToken, - ) -> Option { - None + ) -> SessionTaskResult { + Ok(None) } } @@ -8702,10 +8705,10 @@ impl SessionTask for NeverEndingTask { _ctx: Arc, _input: Vec, cancellation_token: CancellationToken, - ) -> Option { + ) -> SessionTaskResult { if self.listen_to_cancellation_token { cancellation_token.cancelled().await; - return None; + return Ok(None); } loop { sleep(Duration::from_secs(60)).await; @@ -8731,14 +8734,14 @@ impl SessionTask for GuardianDeniedApprovalTask { ctx: Arc, _input: Vec, cancellation_token: CancellationToken, - ) -> Option { + ) -> SessionTaskResult { let session = session.clone_session(); for _ in 0..3 { crate::guardian::record_guardian_denial_for_test(&session, &ctx, &ctx.sub_id).await; } cancellation_token.cancelled().await; - None + Ok(None) } } @@ -8961,7 +8964,7 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input() .await .expect("steer pending input into active turn"); - sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None) + sess.on_task_finished(Arc::clone(&tc), /*task_result*/ Ok(None)) .await; let history = sess.clone_history().await; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 25e4e1a6a..42fab7a50 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -144,7 +144,7 @@ pub(crate) async fn run_turn( input: Vec, prewarmed_client_session: Option, cancellation_token: CancellationToken, -) -> Option { +) -> CodexResult> { let mut client_session = prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session()); // TODO(ccunningham): Pre-turn compaction runs before context updates and the @@ -152,25 +152,31 @@ pub(crate) async fn run_turn( // diffs/full reinjection + user input) and trigger compaction preemptively // when they would push the thread over the compaction threshold. if let Err(err) = run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await { + if matches!(err, CodexErr::TurnAborted) { + return Err(err); + } let error = err.to_codex_protocol_error(); sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone()) .await; error!("Failed to run pre-sampling compact"); - return None; + return Ok(None); } sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref()) .await; - let (injection_items, explicitly_enabled_connectors) = - build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await?; + let Some((injection_items, explicitly_enabled_connectors)) = + build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await + else { + return Ok(None); + }; if run_pending_session_start_hooks(&sess, &turn_context).await { - return None; + return Ok(None); } let mut can_drain_pending_input = input.is_empty(); if run_hooks_and_record_inputs(&sess, &turn_context, &input).await { - return None; + return Ok(None); } sess.merge_connector_selection(explicitly_enabled_connectors.clone()) @@ -336,10 +342,13 @@ pub(crate) async fn run_turn( ) .await { + if matches!(err, CodexErr::TurnAborted) { + return Err(err); + } let error = err.to_codex_protocol_error(); sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone()) .await; - return None; + return Ok(None); } can_drain_pending_input = !model_needs_follow_up; continue; @@ -386,15 +395,14 @@ pub(crate) async fn run_turn( ) .await { - return None; + return Ok(None); } break; } continue; } - Err(CodexErr::TurnAborted) => { - // Aborted turn is reported via a different event. - break; + Err(err @ CodexErr::TurnAborted) => { + return Err(err); } Err(codex_error @ CodexErr::InvalidImageRequest()) => { { @@ -433,7 +441,7 @@ pub(crate) async fn run_turn( } } - last_agent_message + Ok(last_agent_message) } #[instrument(level = "trace", skip_all)] @@ -2197,10 +2205,14 @@ async fn try_run_sampling_request( &mut assistant_message_stream_parsers, ) .await; - sess.record_token_usage_info(&turn_context, token_usage.as_ref()) + let budget_result = sess + .record_token_usage_info(&turn_context, token_usage.as_ref()) .await; should_emit_token_count = true; should_emit_turn_diff = true; + if let Err(err) = budget_result { + break Err(err); + } if let Some(false) = end_turn { needs_follow_up = true; } diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index dd9d6cb07..2c45fd8ee 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -2,10 +2,12 @@ use std::sync::Arc; use super::SessionTask; use super::SessionTaskContext; +use super::SessionTaskResult; use super::emit_compact_metric; use crate::session::TurnInput; use crate::session::turn_context::TurnContext; use crate::state::TaskKind; +use codex_protocol::error::CodexErr; use codex_protocol::user_input::UserInput; use tokio_util::sync::CancellationToken; @@ -27,9 +29,9 @@ impl SessionTask for CompactTask { ctx: Arc, _input: Vec, _cancellation_token: CancellationToken, - ) -> Option { + ) -> SessionTaskResult { let session = session.clone_session(); - let _ = if crate::compact::should_use_remote_compact_task(ctx.provider.info()) { + let result = if crate::compact::should_use_remote_compact_task(ctx.provider.info()) { if ctx .config .features @@ -67,6 +69,9 @@ impl SessionTask for CompactTask { }]; crate::compact::run_compact_task(session.clone(), ctx, input).await }; - None + if let Err(err @ CodexErr::TurnAborted) = result { + return Err(err); + } + Ok(None) } } diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 3ff0e2d5f..b24d43df3 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -54,6 +54,8 @@ use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::WarningEvent; use codex_features::Feature; +use codex_protocol::error::CodexErr; +use codex_protocol::error::Result as CodexResult; use codex_protocol::models::ContentItem; pub(crate) use compact::CompactTask; pub(crate) use regular::RegularTask; @@ -65,6 +67,8 @@ pub(crate) use user_shell::execute_user_shell_command; const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100; const TASK_COMPACT_METRIC: &str = "codex.task.compact"; +pub(crate) type SessionTaskResult = CodexResult>; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum InterruptedTurnHistoryMarker { Disabled, @@ -222,14 +226,16 @@ pub(crate) trait SessionTask: Send + Sync + 'static { /// provided `cancellation_token` is cancelled when the session requests an /// abort; implementers should watch for it and terminate quickly once it /// fires. Returning [`Some`] yields a final message that - /// [`Session::on_task_finished`] will emit to the client. + /// [`Session::on_task_finished`] will emit to the client. Returning + /// [`CodexErr::TurnAborted`] completes the task through the aborted-turn + /// lifecycle instead. fn run( self: Arc, session: Arc, ctx: Arc, input: Vec, cancellation_token: CancellationToken, - ) -> impl std::future::Future> + Send; + ) -> impl std::future::Future + Send; /// Gives the task a chance to perform cleanup after an abort. /// @@ -258,7 +264,7 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static { ctx: Arc, input: Vec, cancellation_token: CancellationToken, - ) -> BoxFuture<'static, Option>; + ) -> BoxFuture<'static, SessionTaskResult>; fn abort<'a>( &'a self, @@ -285,7 +291,7 @@ where ctx: Arc, input: Vec, cancellation_token: CancellationToken, - ) -> BoxFuture<'static, Option> { + ) -> BoxFuture<'static, SessionTaskResult> { Box::pin(SessionTask::run( self, session, @@ -395,7 +401,7 @@ impl Session { let handle = tokio::spawn( async move { let ctx_for_finish = Arc::clone(&ctx); - let last_agent_message = task_for_run + let task_result = task_for_run .run( Arc::clone(&session_ctx), ctx, @@ -418,8 +424,8 @@ impl Session { .await; } if !task_cancellation_token.is_cancelled() { - // Emit completion uniformly from spawn site so all tasks share the same lifecycle. - sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message) + // Finish uniformly from the spawn site so all tasks share the same lifecycle. + sess.on_task_finished(Arc::clone(&ctx_for_finish), task_result) .await; } done_clone.notify_waiters(); @@ -557,8 +563,16 @@ impl Session { pub async fn on_task_finished( self: &Arc, turn_context: Arc, - last_agent_message: Option, + task_result: SessionTaskResult, ) { + let (last_agent_message, abort_reason) = match task_result { + Ok(last_agent_message) => (last_agent_message, None), + Err(CodexErr::TurnAborted) => (None, Some(TurnAbortReason::Interrupted)), + Err(err) => { + warn!(%err, "session task returned an unexpected error"); + (None, None) + } + }; turn_context .turn_metadata_state .cancel_git_enrichment_task(); @@ -730,25 +744,36 @@ impl Session { .turn_timing_state .completed_at_and_duration_ms() .await; - let time_to_first_token_ms = turn_context - .turn_timing_state - .time_to_first_token_ms() - .await; self.services .analytics_events_client .track_turn_profile(TurnProfileFact { turn_id: turn_context.sub_id.clone(), profile: turn_context.turn_timing_state.complete_profile(), }); - self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref()) - .await; - let event = EventMsg::TurnComplete(TurnCompleteEvent { - turn_id: turn_context.sub_id.clone(), - last_agent_message, - completed_at, - duration_ms, - time_to_first_token_ms, - }); + let event = if let Some(reason) = abort_reason { + self.emit_turn_abort_lifecycle(reason.clone(), turn_context.extension_data.as_ref()) + .await; + EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some(turn_context.sub_id.clone()), + reason, + completed_at, + duration_ms, + }) + } else { + let time_to_first_token_ms = turn_context + .turn_timing_state + .time_to_first_token_ms() + .await; + self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref()) + .await; + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: turn_context.sub_id.clone(), + last_agent_message, + completed_at, + duration_ms, + time_to_first_token_ms, + }) + }; self.send_event(turn_context.as_ref(), event).await; self.services .guardian_rejection_circuit_breaker diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index c5cc2da31..40837728a 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -14,6 +14,7 @@ use tracing::trace_span; use super::SessionTask; use super::SessionTaskContext; +use super::SessionTaskResult; #[derive(Default)] pub(crate) struct RegularTask; @@ -39,7 +40,7 @@ impl SessionTask for RegularTask { ctx: Arc, input: Vec, cancellation_token: CancellationToken, - ) -> Option { + ) -> SessionTaskResult { let sess = session.clone_session(); let turn_extension_data = session.turn_extension_data(); let run_turn_span = trace_span!("run_turn"); @@ -61,7 +62,7 @@ impl SessionTask for RegularTask { .instrument(trace_span!("regular_task.prepare_run_turn")) .await; let prewarmed_client_session = match prewarmed_client_session { - SessionStartupPrewarmResolution::Cancelled => return None, + SessionStartupPrewarmResolution::Cancelled => return Ok(None), SessionStartupPrewarmResolution::Unavailable { .. } => None, SessionStartupPrewarmResolution::Ready(prewarmed_client_session) => { Some(*prewarmed_client_session) @@ -79,9 +80,9 @@ impl SessionTask for RegularTask { cancellation_token.child_token(), ) .instrument(run_turn_span.clone()) - .await; + .await?; if !sess.input_queue.has_pending_input(&sess.active_turn).await { - return last_agent_message; + return Ok(last_agent_message); } next_input = Vec::new(); } diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index bb8670237..7c18a5f78 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -29,6 +29,7 @@ use codex_protocol::user_input::UserInput; use super::SessionTask; use super::SessionTaskContext; +use super::SessionTaskResult; #[derive(Clone, Copy)] pub(crate) struct ReviewTask; @@ -54,7 +55,7 @@ impl SessionTask for ReviewTask { ctx: Arc, input: Vec, cancellation_token: CancellationToken, - ) -> Option { + ) -> SessionTaskResult { session.session.services.session_telemetry.counter( "codex.task.review", /*inc*/ 1, @@ -84,7 +85,7 @@ impl SessionTask for ReviewTask { if !cancellation_token.is_cancelled() { exit_review_mode(session.clone_session(), output.clone(), ctx.clone()).await; } - None + Ok(None) } async fn abort(&self, session: Arc, ctx: Arc) { diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index 7cb9841db..c5b44a649 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -41,6 +41,7 @@ use codex_shell_command::parse_command::parse_command; use super::SessionTask; use super::SessionTaskContext; +use super::SessionTaskResult; use crate::session::session::Session; use codex_protocol::models::PermissionProfile; @@ -82,7 +83,7 @@ impl SessionTask for UserShellCommandTask { turn_context: Arc, _input: Vec, cancellation_token: CancellationToken, - ) -> Option { + ) -> SessionTaskResult { execute_user_shell_command( session.clone_session(), turn_context, @@ -91,7 +92,7 @@ impl SessionTask for UserShellCommandTask { UserShellCommandMode::StandaloneTurn, ) .await; - None + Ok(None) } } diff --git a/codex-rs/core/tests/suite/rollout_budget.rs b/codex-rs/core/tests/suite/rollout_budget.rs index c2a044eb5..61c6dfe42 100644 --- a/codex-rs/core/tests/suite/rollout_budget.rs +++ b/codex-rs/core/tests/suite/rollout_budget.rs @@ -4,6 +4,8 @@ use codex_features::Feature; use codex_model_provider_info::built_in_model_providers; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; +use codex_protocol::protocol::TurnAbortReason; +use codex_protocol::user_input::UserInput; use core_test_support::responses::ResponsesRequest; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -20,6 +22,7 @@ use core_test_support::wait_for_event; use pretty_assertions::assert_eq; use serde_json::json; use std::time::Duration; +use test_case::test_case; use tokio::time::timeout; const ROLLOUT_BUDGET: RolloutBudgetConfig = RolloutBudgetConfig { @@ -192,6 +195,136 @@ async fn subagent_usage_draws_from_the_shared_budget() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exhausted_budget_aborts_current_and_later_turns() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("exhaust-budget"), + ev_completed_with_tokens("exhaust-budget", /*total_tokens*/ 30), + ]), + sse(vec![ + ev_response_created("already-exhausted"), + ev_completed_with_tokens("already-exhausted", /*total_tokens*/ 1), + ]), + ], + ) + .await; + let test = test_codex() + .with_config(|config| { + config.rollout_budget = Some(RolloutBudgetConfig { + limit_tokens: 30, + reminder_interval_tokens: 10, + ..ROLLOUT_BUDGET + }); + }) + .build(&server) + .await?; + + for prompt in ["exhaust the budget", "try another turn"] { + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: prompt.to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + additional_context: Default::default(), + thread_settings: Default::default(), + }) + .await?; + + let event = wait_for_event(&test.codex, |event| match event { + EventMsg::TurnAborted(_) => true, + EventMsg::TurnComplete(_) => { + panic!("exhausted budget completed the turn instead of aborting") + } + _ => false, + }) + .await; + let EventMsg::TurnAborted(abort) = event else { + unreachable!("event filter only accepts TurnAborted") + }; + assert_eq!(abort.reason, TurnAbortReason::Interrupted); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[test_case(false ; "local")] +#[test_case(true ; "remote_v2")] +async fn compaction_budget_exhaustion_aborts_without_error_or_retry(remote_v2: bool) -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let compact_response = if remote_v2 { + sse(vec![ + json!({ + "type": "response.output_item.done", + "item": { + "type": "compaction", + "encrypted_content": "encrypted-summary", + } + }), + ev_completed_with_tokens("compact", /*total_tokens*/ 10), + ]) + } else { + sse(vec![ + ev_response_created("compact"), + ev_assistant_message("compact-summary", "compact summary"), + ev_completed_with_tokens("compact", /*total_tokens*/ 10), + ]) + }; + let responses = mount_sse_sequence(&server, vec![compact_response]).await; + let mut model_provider = built_in_model_providers(/*openai_base_url*/ None)["openai"].clone(); + model_provider.base_url = Some(format!("{}/v1", server.uri())); + model_provider.supports_websockets = false; + if !remote_v2 { + model_provider.name = "OpenAI-compatible test provider".to_string(); + } + let test = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + config.rollout_budget = Some(RolloutBudgetConfig { + limit_tokens: 10, + reminder_interval_tokens: 5, + ..ROLLOUT_BUDGET + }); + if remote_v2 { + config + .features + .enable(Feature::RemoteCompactionV2) + .expect("test config should allow remote compaction v2"); + } + }) + .build(&server) + .await?; + + test.codex.submit(Op::Compact).await?; + let event = wait_for_event(&test.codex, |event| match event { + EventMsg::TurnAborted(_) => true, + EventMsg::Error(error) => panic!("budget exhaustion emitted an error: {}", error.message), + EventMsg::TurnComplete(_) => { + panic!("budget-exhausting compaction completed instead of aborting") + } + _ => false, + }) + .await; + let EventMsg::TurnAborted(abort) = event else { + unreachable!("event filter only accepts TurnAborted") + }; + assert_eq!(abort.reason, TurnAbortReason::Interrupted); + assert_eq!(responses.requests().len(), 1, "compaction should not retry"); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn restates_the_current_remainder_after_compaction() -> Result<()> { skip_if_no_network!(Ok(()));