[codex] abort turns when rollout budgets expire (token budget 3/3) (#28707)

## Stack

Depends on #28494.

## Description

This PR propagates shared rollout-budget exhaustion through the existing
`CodexErr::TurnAborted` task result.

Each thread records its model usage against the same ledger. Once the
ledger is exhausted, that usage update and all later usage updates
return `TurnAborted`. The task wrapper emits the normal aborted-turn
event and lifecycle instead of completing the turn.

This is intentionally a soft boundary: there is no cross-thread
`Op::Interrupt` fanout. An in-flight thread can finish its current
response before it observes the exhausted ledger, but every thread
aborts at its next usage-accounting boundary.

## Tests

The integration coverage verifies that:

- the response that exhausts the budget aborts its turn;
- a later response also aborts because the shared ledger remains
exhausted; and
- sub-agent usage draws from the same shared ledger; and
- local and remote-v2 compaction abort without retrying or emitting a
generic error.

Local checks:

- `just test -p codex-core
exhausted_budget_aborts_current_and_later_turns`
- `just test -p codex-core subagent_usage_draws_from_the_shared_budget`
- `just test -p codex-core
abort_regular_task_emits_marker_before_turn_aborted`
- `just test -p codex-core
compaction_budget_exhaustion_aborts_without_error_or_retry`
- `just fix -p codex-core`
- `just fmt`
- `git diff --check`

The full workspace test suite was not run locally.
This commit is contained in:
rka-oai
2026-06-19 02:00:01 -07:00
committed by GitHub
Unverified
parent 7abfcf220b
commit dac588f413
13 changed files with 266 additions and 69 deletions
+3 -3
View File
@@ -254,8 +254,8 @@ async fn run_compact_task_inner_impl(
Ok(()) => {
break;
}
Err(CodexErr::Interrupted) => {
return Err(CodexErr::Interrupted);
Err(err @ (CodexErr::Interrupted | CodexErr::TurnAborted)) => {
return Err(err);
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
@@ -657,7 +657,7 @@ async fn drain_to_completed(
}
Ok(ResponseEvent::Completed { token_usage, .. }) => {
sess.update_token_usage_info(turn_context, token_usage.as_ref())
.await;
.await?;
return Ok(());
}
Ok(_) => continue,
+4 -1
View File
@@ -165,6 +165,9 @@ async fn run_remote_compact_task_inner(
attempt
.track(sess.as_ref(), status, codex_error, analytics_details)
.await;
if matches!(&result, Err(CodexErr::TurnAborted)) {
return result;
}
if let Err(err) = result {
sess.track_turn_codex_error(turn_context, &err);
let event = EventMsg::Error(
@@ -281,7 +284,7 @@ async fn run_remote_compact_task_inner_impl(
token_usage,
} = compaction_output_result?;
if let Some(token_usage) = token_usage {
sess.record_rollout_budget_usage(&token_usage);
sess.record_rollout_budget_usage(&token_usage)?;
analytics_details.active_context_tokens_before = Some(token_usage.input_tokens);
analytics_details.compaction_summary_tokens = Some(token_usage.output_tokens);
analytics_details.cached_input_tokens = Some(token_usage.cached_input_tokens);
+4 -2
View File
@@ -40,13 +40,15 @@ impl RolloutBudget {
});
}
pub(crate) fn record_usage(&self, usage: &TokenUsage) {
/// Returns true once the configured budget is exhausted, including on later calls.
pub(crate) fn record_usage(&self, usage: &TokenUsage) -> bool {
let Some(mut state) = self.lock() else {
return;
return false;
};
state.weighted_tokens_used += usage.output_tokens.max(0) as f64
* state.config.sampling_token_weight
+ usage.non_cached_input() as f64 * state.config.prefill_token_weight;
state.weighted_tokens_used >= state.config.limit_tokens as f64
}
pub(crate) fn pending_reminder(
+8 -4
View File
@@ -3410,17 +3410,19 @@ impl Session {
&self,
turn_context: &TurnContext,
token_usage: Option<&TokenUsage>,
) {
self.record_token_usage_info(turn_context, token_usage)
) -> CodexResult<()> {
let result = self
.record_token_usage_info(turn_context, token_usage)
.await;
self.send_token_count_event(turn_context).await;
result
}
pub(crate) async fn record_token_usage_info(
&self,
turn_context: &TurnContext,
token_usage: Option<&TokenUsage>,
) {
) -> CodexResult<()> {
if let Some(token_usage) = token_usage {
let token_info = {
let mut state = self.state.lock().await;
@@ -3434,7 +3436,7 @@ impl Session {
}
state.token_info()
};
self.record_rollout_budget_usage(token_usage);
let budget_result = self.record_rollout_budget_usage(token_usage);
if let Some(token_info) = token_info.as_ref() {
for contributor in self.services.extensions.token_usage_contributors() {
contributor
@@ -3447,7 +3449,9 @@ impl Session {
.await;
}
}
budget_result?;
}
Ok(())
}
pub(crate) async fn recompute_token_usage(&self, turn_context: &TurnContext) {
+10 -3
View File
@@ -1,6 +1,8 @@
use super::session::Session;
use super::turn_context::TurnContext;
use crate::context::ContextualUserFragment;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::protocol::TokenUsage;
pub(super) async fn maybe_record_reminder(
@@ -21,10 +23,15 @@ pub(super) async fn maybe_record_reminder(
}
impl Session {
pub(crate) fn record_rollout_budget_usage(&self, usage: &TokenUsage) {
self.services
pub(crate) fn record_rollout_budget_usage(&self, usage: &TokenUsage) -> CodexResult<()> {
if self
.services
.agent_control
.rollout_budget()
.record_usage(usage);
.record_usage(usage)
{
return Err(CodexErr::TurnAborted);
}
Ok(())
}
}
+14 -11
View File
@@ -70,6 +70,7 @@ use crate::state::ActiveTurn;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
use crate::tasks::SessionTaskResult;
use crate::tasks::UserShellCommandMode;
use crate::tasks::execute_user_shell_command;
use crate::tools::ToolRouter;
@@ -2150,10 +2151,12 @@ async fn record_token_usage_info_notifies_extension_contributors() {
session
.record_token_usage_info(&turn_context, Some(&first_usage))
.await;
.await
.expect("first usage should be recorded");
session
.record_token_usage_info(&turn_context, Some(&second_usage))
.await;
.await
.expect("second usage should be recorded");
let mut expected_total_usage = first_usage.clone();
expected_total_usage.add_assign(&second_usage);
@@ -6474,13 +6477,13 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
_ctx: Arc<TurnContext>,
_input: Vec<TurnInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
) -> SessionTaskResult {
let mut trace = self
.captured_trace
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*trace = current_span_w3c_trace_context();
None
Ok(None)
}
}
@@ -8676,8 +8679,8 @@ impl SessionTask for CompletingTask {
_ctx: Arc<TurnContext>,
_input: Vec<TurnInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
None
) -> SessionTaskResult {
Ok(None)
}
}
@@ -8702,10 +8705,10 @@ impl SessionTask for NeverEndingTask {
_ctx: Arc<TurnContext>,
_input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> SessionTaskResult {
if self.listen_to_cancellation_token {
cancellation_token.cancelled().await;
return None;
return Ok(None);
}
loop {
sleep(Duration::from_secs(60)).await;
@@ -8731,14 +8734,14 @@ impl SessionTask for GuardianDeniedApprovalTask {
ctx: Arc<TurnContext>,
_input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> SessionTaskResult {
let session = session.clone_session();
for _ in 0..3 {
crate::guardian::record_guardian_denial_for_test(&session, &ctx, &ctx.sub_id).await;
}
cancellation_token.cancelled().await;
None
Ok(None)
}
}
@@ -8961,7 +8964,7 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
.await
.expect("steer pending input into active turn");
sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None)
sess.on_task_finished(Arc::clone(&tc), /*task_result*/ Ok(None))
.await;
let history = sess.clone_history().await;
+25 -13
View File
@@ -144,7 +144,7 @@ pub(crate) async fn run_turn(
input: Vec<TurnInput>,
prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> CodexResult<Option<String>> {
let mut client_session =
prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session());
// TODO(ccunningham): Pre-turn compaction runs before context updates and the
@@ -152,25 +152,31 @@ pub(crate) async fn run_turn(
// diffs/full reinjection + user input) and trigger compaction preemptively
// when they would push the thread over the compaction threshold.
if let Err(err) = run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await {
if matches!(err, CodexErr::TurnAborted) {
return Err(err);
}
let error = err.to_codex_protocol_error();
sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone())
.await;
error!("Failed to run pre-sampling compact");
return None;
return Ok(None);
}
sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
.await;
let (injection_items, explicitly_enabled_connectors) =
build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await?;
let Some((injection_items, explicitly_enabled_connectors)) =
build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await
else {
return Ok(None);
};
if run_pending_session_start_hooks(&sess, &turn_context).await {
return None;
return Ok(None);
}
let mut can_drain_pending_input = input.is_empty();
if run_hooks_and_record_inputs(&sess, &turn_context, &input).await {
return None;
return Ok(None);
}
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
@@ -336,10 +342,13 @@ pub(crate) async fn run_turn(
)
.await
{
if matches!(err, CodexErr::TurnAborted) {
return Err(err);
}
let error = err.to_codex_protocol_error();
sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone())
.await;
return None;
return Ok(None);
}
can_drain_pending_input = !model_needs_follow_up;
continue;
@@ -386,15 +395,14 @@ pub(crate) async fn run_turn(
)
.await
{
return None;
return Ok(None);
}
break;
}
continue;
}
Err(CodexErr::TurnAborted) => {
// Aborted turn is reported via a different event.
break;
Err(err @ CodexErr::TurnAborted) => {
return Err(err);
}
Err(codex_error @ CodexErr::InvalidImageRequest()) => {
{
@@ -433,7 +441,7 @@ pub(crate) async fn run_turn(
}
}
last_agent_message
Ok(last_agent_message)
}
#[instrument(level = "trace", skip_all)]
@@ -2197,10 +2205,14 @@ async fn try_run_sampling_request(
&mut assistant_message_stream_parsers,
)
.await;
sess.record_token_usage_info(&turn_context, token_usage.as_ref())
let budget_result = sess
.record_token_usage_info(&turn_context, token_usage.as_ref())
.await;
should_emit_token_count = true;
should_emit_turn_diff = true;
if let Err(err) = budget_result {
break Err(err);
}
if let Some(false) = end_turn {
needs_follow_up = true;
}
+8 -3
View File
@@ -2,10 +2,12 @@ use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
use super::SessionTaskResult;
use super::emit_compact_metric;
use crate::session::TurnInput;
use crate::session::turn_context::TurnContext;
use crate::state::TaskKind;
use codex_protocol::error::CodexErr;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;
@@ -27,9 +29,9 @@ impl SessionTask for CompactTask {
ctx: Arc<TurnContext>,
_input: Vec<TurnInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
) -> SessionTaskResult {
let session = session.clone_session();
let _ = if crate::compact::should_use_remote_compact_task(ctx.provider.info()) {
let result = if crate::compact::should_use_remote_compact_task(ctx.provider.info()) {
if ctx
.config
.features
@@ -67,6 +69,9 @@ impl SessionTask for CompactTask {
}];
crate::compact::run_compact_task(session.clone(), ctx, input).await
};
None
if let Err(err @ CodexErr::TurnAborted) = result {
return Err(err);
}
Ok(None)
}
}
+46 -21
View File
@@ -54,6 +54,8 @@ use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::WarningEvent;
use codex_features::Feature;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::models::ContentItem;
pub(crate) use compact::CompactTask;
pub(crate) use regular::RegularTask;
@@ -65,6 +67,8 @@ pub(crate) use user_shell::execute_user_shell_command;
const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
const TASK_COMPACT_METRIC: &str = "codex.task.compact";
pub(crate) type SessionTaskResult = CodexResult<Option<String>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum InterruptedTurnHistoryMarker {
Disabled,
@@ -222,14 +226,16 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
/// provided `cancellation_token` is cancelled when the session requests an
/// abort; implementers should watch for it and terminate quickly once it
/// fires. Returning [`Some`] yields a final message that
/// [`Session::on_task_finished`] will emit to the client.
/// [`Session::on_task_finished`] will emit to the client. Returning
/// [`CodexErr::TurnAborted`] completes the task through the aborted-turn
/// lifecycle instead.
fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> impl std::future::Future<Output = Option<String>> + Send;
) -> impl std::future::Future<Output = SessionTaskResult> + Send;
/// Gives the task a chance to perform cleanup after an abort.
///
@@ -258,7 +264,7 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static {
ctx: Arc<TurnContext>,
input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> BoxFuture<'static, Option<String>>;
) -> BoxFuture<'static, SessionTaskResult>;
fn abort<'a>(
&'a self,
@@ -285,7 +291,7 @@ where
ctx: Arc<TurnContext>,
input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> BoxFuture<'static, Option<String>> {
) -> BoxFuture<'static, SessionTaskResult> {
Box::pin(SessionTask::run(
self,
session,
@@ -395,7 +401,7 @@ impl Session {
let handle = tokio::spawn(
async move {
let ctx_for_finish = Arc::clone(&ctx);
let last_agent_message = task_for_run
let task_result = task_for_run
.run(
Arc::clone(&session_ctx),
ctx,
@@ -418,8 +424,8 @@ impl Session {
.await;
}
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
// Finish uniformly from the spawn site so all tasks share the same lifecycle.
sess.on_task_finished(Arc::clone(&ctx_for_finish), task_result)
.await;
}
done_clone.notify_waiters();
@@ -557,8 +563,16 @@ impl Session {
pub async fn on_task_finished(
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
last_agent_message: Option<String>,
task_result: SessionTaskResult,
) {
let (last_agent_message, abort_reason) = match task_result {
Ok(last_agent_message) => (last_agent_message, None),
Err(CodexErr::TurnAborted) => (None, Some(TurnAbortReason::Interrupted)),
Err(err) => {
warn!(%err, "session task returned an unexpected error");
(None, None)
}
};
turn_context
.turn_metadata_state
.cancel_git_enrichment_task();
@@ -730,25 +744,36 @@ impl Session {
.turn_timing_state
.completed_at_and_duration_ms()
.await;
let time_to_first_token_ms = turn_context
.turn_timing_state
.time_to_first_token_ms()
.await;
self.services
.analytics_events_client
.track_turn_profile(TurnProfileFact {
turn_id: turn_context.sub_id.clone(),
profile: turn_context.turn_timing_state.complete_profile(),
});
self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref())
.await;
let event = EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_context.sub_id.clone(),
last_agent_message,
completed_at,
duration_ms,
time_to_first_token_ms,
});
let event = if let Some(reason) = abort_reason {
self.emit_turn_abort_lifecycle(reason.clone(), turn_context.extension_data.as_ref())
.await;
EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some(turn_context.sub_id.clone()),
reason,
completed_at,
duration_ms,
})
} else {
let time_to_first_token_ms = turn_context
.turn_timing_state
.time_to_first_token_ms()
.await;
self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref())
.await;
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_context.sub_id.clone(),
last_agent_message,
completed_at,
duration_ms,
time_to_first_token_ms,
})
};
self.send_event(turn_context.as_ref(), event).await;
self.services
.guardian_rejection_circuit_breaker
+5 -4
View File
@@ -14,6 +14,7 @@ use tracing::trace_span;
use super::SessionTask;
use super::SessionTaskContext;
use super::SessionTaskResult;
#[derive(Default)]
pub(crate) struct RegularTask;
@@ -39,7 +40,7 @@ impl SessionTask for RegularTask {
ctx: Arc<TurnContext>,
input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> SessionTaskResult {
let sess = session.clone_session();
let turn_extension_data = session.turn_extension_data();
let run_turn_span = trace_span!("run_turn");
@@ -61,7 +62,7 @@ impl SessionTask for RegularTask {
.instrument(trace_span!("regular_task.prepare_run_turn"))
.await;
let prewarmed_client_session = match prewarmed_client_session {
SessionStartupPrewarmResolution::Cancelled => return None,
SessionStartupPrewarmResolution::Cancelled => return Ok(None),
SessionStartupPrewarmResolution::Unavailable { .. } => None,
SessionStartupPrewarmResolution::Ready(prewarmed_client_session) => {
Some(*prewarmed_client_session)
@@ -79,9 +80,9 @@ impl SessionTask for RegularTask {
cancellation_token.child_token(),
)
.instrument(run_turn_span.clone())
.await;
.await?;
if !sess.input_queue.has_pending_input(&sess.active_turn).await {
return last_agent_message;
return Ok(last_agent_message);
}
next_input = Vec::new();
}
+3 -2
View File
@@ -29,6 +29,7 @@ use codex_protocol::user_input::UserInput;
use super::SessionTask;
use super::SessionTaskContext;
use super::SessionTaskResult;
#[derive(Clone, Copy)]
pub(crate) struct ReviewTask;
@@ -54,7 +55,7 @@ impl SessionTask for ReviewTask {
ctx: Arc<TurnContext>,
input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> SessionTaskResult {
session.session.services.session_telemetry.counter(
"codex.task.review",
/*inc*/ 1,
@@ -84,7 +85,7 @@ impl SessionTask for ReviewTask {
if !cancellation_token.is_cancelled() {
exit_review_mode(session.clone_session(), output.clone(), ctx.clone()).await;
}
None
Ok(None)
}
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
+3 -2
View File
@@ -41,6 +41,7 @@ use codex_shell_command::parse_command::parse_command;
use super::SessionTask;
use super::SessionTaskContext;
use super::SessionTaskResult;
use crate::session::session::Session;
use codex_protocol::models::PermissionProfile;
@@ -82,7 +83,7 @@ impl SessionTask for UserShellCommandTask {
turn_context: Arc<TurnContext>,
_input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> SessionTaskResult {
execute_user_shell_command(
session.clone_session(),
turn_context,
@@ -91,7 +92,7 @@ impl SessionTask for UserShellCommandTask {
UserShellCommandMode::StandaloneTurn,
)
.await;
None
Ok(None)
}
}
+133
View File
@@ -4,6 +4,8 @@ use codex_features::Feature;
use codex_model_provider_info::built_in_model_providers;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
@@ -20,6 +22,7 @@ use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::time::Duration;
use test_case::test_case;
use tokio::time::timeout;
const ROLLOUT_BUDGET: RolloutBudgetConfig = RolloutBudgetConfig {
@@ -192,6 +195,136 @@ async fn subagent_usage_draws_from_the_shared_budget() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exhausted_budget_aborts_current_and_later_turns() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("exhaust-budget"),
ev_completed_with_tokens("exhaust-budget", /*total_tokens*/ 30),
]),
sse(vec![
ev_response_created("already-exhausted"),
ev_completed_with_tokens("already-exhausted", /*total_tokens*/ 1),
]),
],
)
.await;
let test = test_codex()
.with_config(|config| {
config.rollout_budget = Some(RolloutBudgetConfig {
limit_tokens: 30,
reminder_interval_tokens: 10,
..ROLLOUT_BUDGET
});
})
.build(&server)
.await?;
for prompt in ["exhaust the budget", "try another turn"] {
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: prompt.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
let event = wait_for_event(&test.codex, |event| match event {
EventMsg::TurnAborted(_) => true,
EventMsg::TurnComplete(_) => {
panic!("exhausted budget completed the turn instead of aborting")
}
_ => false,
})
.await;
let EventMsg::TurnAborted(abort) = event else {
unreachable!("event filter only accepts TurnAborted")
};
assert_eq!(abort.reason, TurnAbortReason::Interrupted);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[test_case(false ; "local")]
#[test_case(true ; "remote_v2")]
async fn compaction_budget_exhaustion_aborts_without_error_or_retry(remote_v2: bool) -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let compact_response = if remote_v2 {
sse(vec![
json!({
"type": "response.output_item.done",
"item": {
"type": "compaction",
"encrypted_content": "encrypted-summary",
}
}),
ev_completed_with_tokens("compact", /*total_tokens*/ 10),
])
} else {
sse(vec![
ev_response_created("compact"),
ev_assistant_message("compact-summary", "compact summary"),
ev_completed_with_tokens("compact", /*total_tokens*/ 10),
])
};
let responses = mount_sse_sequence(&server, vec![compact_response]).await;
let mut model_provider = built_in_model_providers(/*openai_base_url*/ None)["openai"].clone();
model_provider.base_url = Some(format!("{}/v1", server.uri()));
model_provider.supports_websockets = false;
if !remote_v2 {
model_provider.name = "OpenAI-compatible test provider".to_string();
}
let test = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
config.rollout_budget = Some(RolloutBudgetConfig {
limit_tokens: 10,
reminder_interval_tokens: 5,
..ROLLOUT_BUDGET
});
if remote_v2 {
config
.features
.enable(Feature::RemoteCompactionV2)
.expect("test config should allow remote compaction v2");
}
})
.build(&server)
.await?;
test.codex.submit(Op::Compact).await?;
let event = wait_for_event(&test.codex, |event| match event {
EventMsg::TurnAborted(_) => true,
EventMsg::Error(error) => panic!("budget exhaustion emitted an error: {}", error.message),
EventMsg::TurnComplete(_) => {
panic!("budget-exhausting compaction completed instead of aborting")
}
_ => false,
})
.await;
let EventMsg::TurnAborted(abort) = event else {
unreachable!("event filter only accepts TurnAborted")
};
assert_eq!(abort.reason, TurnAbortReason::Interrupted);
assert_eq!(responses.requests().len(), 1, "compaction should not retry");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn restates_the_current_remainder_after_compaction() -> Result<()> {
skip_if_no_network!(Ok(()));