From 32a696dbacaa1383745455ea2a77d5477891ed0b Mon Sep 17 00:00:00 2001 From: rka-oai Date: Thu, 18 Jun 2026 11:52:19 -0700 Subject: [PATCH] [codex] rollout budget implementation (varlength 2/N) (#28494) ## Stack Depends on #28746. This PR implements shared rollout-budget accounting and model-visible reminders using the configuration defined in #28746. # Description / Main changes to Core: `AgentControl` will now be the area where "rollout level" features & accounting will have to live. It is incorrectly named for this responsibility, but I think it can hold all the necessary shared state & features (rollout token budget, mutliple thread interruption responsibilitym etc) In this PR, we have one "token ledger" that each thread will subtract from when sampling. The "charge" will occur when response.completed() is done and the calculation will be done on the responses api usage carrier. The calculation will weigh sampling and pre-fill tokens as specified. Every time the budget crosses the configured reminder threshold, a developer message is appended before the thread's next request This remaining budget will _always_ be restated/reminded after a compaction event. Expiration and fan-out interruption will be in the stacked follow-up (and also live in Agent Control). ## Reminders "You have weighted {session_tokens_left} tokens left in the shared session token budget." The first request in each thread context receives the current remainder. Later reminders are emitted after aggregate weighted usage crosses a configured interval. If several intervals are crossed before a thread sends another request, Core inserts one reminder with the latest remainder. Compaction response usage is charged before the next context starts. The next reminder is appended after the compaction summary, leaving the initial context content stable. ## Tests Integration coverage verifies: - weighted output and non-cached input accounting - initial and periodic reminders - shared accounting between a root and sub-agent - post-compaction remainder and message placement Local checks: - `just fmt` - `just test -p codex-core rollout_budget` - `git diff --check` The full workspace test suite was not run locally. --- codex-rs/core/src/agent/control.rs | 19 +- codex-rs/core/src/compact_remote_v2.rs | 1 + codex-rs/core/src/context/mod.rs | 2 + codex-rs/core/src/context/rollout_budget.rs | 27 ++ codex-rs/core/src/event_mapping.rs | 1 + codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/rollout_budget.rs | 108 +++++++ codex-rs/core/src/session/handlers.rs | 4 + codex-rs/core/src/session/mod.rs | 2 + codex-rs/core/src/session/rollout_budget.rs | 30 ++ codex-rs/core/src/session/turn.rs | 9 +- codex-rs/core/src/thread_manager.rs | 21 +- codex-rs/core/tests/suite/mod.rs | 1 + codex-rs/core/tests/suite/rollout_budget.rs | 304 ++++++++++++++++++++ 14 files changed, 521 insertions(+), 9 deletions(-) create mode 100644 codex-rs/core/src/context/rollout_budget.rs create mode 100644 codex-rs/core/src/rollout_budget.rs create mode 100644 codex-rs/core/src/session/rollout_budget.rs create mode 100644 codex-rs/core/tests/suite/rollout_budget.rs diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 01f0e1415..410c97cbd 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -6,7 +6,9 @@ use crate::agent::role::resolve_role_config; use crate::agent::status::is_final; use crate::codex_thread::ThreadConfigSnapshot; use crate::config::Config; +use crate::config::RolloutBudgetConfig; use crate::environment_selection::TurnEnvironmentSnapshot; +use crate::rollout_budget::RolloutBudget; use crate::session::emit_subagent_session_started; use crate::session_prefix::format_inter_agent_completion_message; use crate::session_prefix::format_subagent_context_line; @@ -100,15 +102,24 @@ pub(crate) struct AgentControl { state: Arc, v2_residency: Arc, agent_execution_limiter: Arc, + /// Session-scoped state shared by the root thread and every cloned sub-agent control handle. + rollout_budget: Arc, } impl AgentControl { /// Construct a new `AgentControl` that can spawn/message agents via the given manager state. - pub(crate) fn new(manager: Weak) -> Self { - Self { + pub(crate) fn new( + manager: Weak, + rollout_budget: Option, + ) -> Self { + let control = Self { manager, ..Default::default() + }; + if let Some(rollout_budget) = rollout_budget { + control.rollout_budget.configure(rollout_budget); } + control } pub(crate) fn with_session_id(mut self, session_id: SessionId, max_threads: usize) -> Self { @@ -121,6 +132,10 @@ impl AgentControl { self.session_id } + pub(crate) fn rollout_budget(&self) -> &RolloutBudget { + self.rollout_budget.as_ref() + } + /// Send rich user input items to an existing agent thread. pub(crate) async fn send_input( &self, diff --git a/codex-rs/core/src/compact_remote_v2.rs b/codex-rs/core/src/compact_remote_v2.rs index 6c3a93902..f9c2ddc71 100644 --- a/codex-rs/core/src/compact_remote_v2.rs +++ b/codex-rs/core/src/compact_remote_v2.rs @@ -284,6 +284,7 @@ async fn run_remote_compact_task_inner_impl( token_usage, } = compaction_output_result?; if let Some(token_usage) = token_usage { + sess.record_rollout_budget_usage(&token_usage); analytics_details.active_context_tokens_before = Some(token_usage.input_tokens); analytics_details.compaction_summary_tokens = Some(token_usage.output_tokens); analytics_details.cached_input_tokens = Some(token_usage.cached_input_tokens); diff --git a/codex-rs/core/src/context/mod.rs b/codex-rs/core/src/context/mod.rs index a1f204e47..7fc137578 100644 --- a/codex-rs/core/src/context/mod.rs +++ b/codex-rs/core/src/context/mod.rs @@ -24,6 +24,7 @@ mod realtime_end_instructions; mod realtime_start_instructions; mod realtime_start_with_instructions; mod recommended_plugins_instructions; +mod rollout_budget; mod subagent_notification; mod token_budget_context; mod turn_aborted; @@ -64,6 +65,7 @@ pub(crate) use realtime_end_instructions::RealtimeEndInstructions; pub(crate) use realtime_start_instructions::RealtimeStartInstructions; pub(crate) use realtime_start_with_instructions::RealtimeStartWithInstructions; pub(crate) use recommended_plugins_instructions::RecommendedPluginsInstructions; +pub(crate) use rollout_budget::RolloutBudgetContext; pub(crate) use subagent_notification::SubagentNotification; pub(crate) use token_budget_context::TokenBudgetContext; pub(crate) use token_budget_context::TokenBudgetRemainingContext; diff --git a/codex-rs/core/src/context/rollout_budget.rs b/codex-rs/core/src/context/rollout_budget.rs new file mode 100644 index 000000000..33ed724b8 --- /dev/null +++ b/codex-rs/core/src/context/rollout_budget.rs @@ -0,0 +1,27 @@ +use super::ContextualUserFragment; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct RolloutBudgetContext { + pub(crate) remaining_tokens: i64, +} + +impl ContextualUserFragment for RolloutBudgetContext { + fn role(&self) -> &'static str { + "developer" + } + + fn markers(&self) -> (&'static str, &'static str) { + Self::type_markers() + } + + fn type_markers() -> (&'static str, &'static str) { + ("\n", "\n") + } + + fn body(&self) -> String { + format!( + "You have {} weighted tokens left in the shared session token budget.", + self.remaining_tokens + ) + } +} diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index 9e96ebff5..73193984b 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -33,6 +33,7 @@ const CONTEXTUAL_DEVELOPER_PREFIXES: &[&str] = &[ SKILLS_INSTRUCTIONS_OPEN_TAG, "", "", + "", ]; pub(crate) fn is_contextual_user_message_content(message: &[ContentItem]) -> bool { diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 0641d7226..0e193baa1 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -131,6 +131,7 @@ pub use agents_md::DEFAULT_AGENTS_MD_FILENAME; pub use agents_md::LOCAL_AGENTS_MD_FILENAME; pub use agents_md::LoadedAgentsMd; mod rollout; +mod rollout_budget; pub(crate) mod safety; mod session_rollout_init_error; pub mod shell; diff --git a/codex-rs/core/src/rollout_budget.rs b/codex-rs/core/src/rollout_budget.rs new file mode 100644 index 000000000..accabac76 --- /dev/null +++ b/codex-rs/core/src/rollout_budget.rs @@ -0,0 +1,108 @@ +use crate::config::RolloutBudgetConfig; +use codex_protocol::ThreadId; +use codex_protocol::protocol::TokenUsage; +use std::collections::HashMap; +use std::sync::Mutex; +use std::sync::MutexGuard; +use std::sync::OnceLock; + +pub(crate) struct RolloutBudgetReminder { + pub(crate) remaining_tokens: i64, + reminder_index: i64, +} + +/// Shared accounting and reminder state for one root-thread session tree. +#[derive(Default)] +pub(crate) struct RolloutBudget { + state: OnceLock>, +} + +struct RolloutBudgetState { + config: RolloutBudgetConfig, + weighted_tokens_used: f64, + /// Last reminder delivered to each thread, so every thread observes crossed thresholds. + deliveries: HashMap, +} + +struct ThreadBudgetDelivery { + window_id: String, + reminder_index: i64, +} + +impl RolloutBudget { + pub(crate) fn configure(&self, config: RolloutBudgetConfig) { + self.state.get_or_init(|| { + Mutex::new(RolloutBudgetState { + config, + weighted_tokens_used: 0.0, + deliveries: HashMap::new(), + }) + }); + } + + pub(crate) fn record_usage(&self, usage: &TokenUsage) { + let Some(mut state) = self.lock() else { + return; + }; + state.weighted_tokens_used += usage.output_tokens.max(0) as f64 + * state.config.sampling_token_weight + + usage.non_cached_input() as f64 * state.config.prefill_token_weight; + } + + pub(crate) fn pending_reminder( + &self, + thread_id: ThreadId, + window_id: &str, + ) -> Option { + let state = self.lock()?; + let reminder_index = (state.weighted_tokens_used + / state.config.reminder_interval_tokens as f64) + .floor() as i64; + if state.deliveries.get(&thread_id).is_some_and(|delivery| { + delivery.window_id.as_str() == window_id && delivery.reminder_index >= reminder_index + }) { + return None; + } + Some(RolloutBudgetReminder { + remaining_tokens: (state.config.limit_tokens as f64 - state.weighted_tokens_used) + .max(0.0) + .floor() as i64, + reminder_index, + }) + } + + pub(crate) fn mark_reminder_delivered( + &self, + thread_id: ThreadId, + window_id: &str, + reminder: RolloutBudgetReminder, + ) { + // Mark delivery only after history insertion; cancellation before then should retry it. + let Some(mut state) = self.lock() else { + return; + }; + state.deliveries.insert( + thread_id, + ThreadBudgetDelivery { + window_id: window_id.to_string(), + reminder_index: reminder.reminder_index, + }, + ); + } + + /// Forces the next sampling request for `thread_id` to restate the current remainder. + pub(crate) fn rearm_reminder(&self, thread_id: ThreadId) { + let Some(mut state) = self.lock() else { + return; + }; + state.deliveries.remove(&thread_id); + } + + fn lock(&self) -> Option> { + self.state.get().map(|state| { + state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + }) + } +} diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index 4ed79d04e..5299747f3 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -525,6 +525,10 @@ pub async fn thread_rollback(sess: &Arc, sub_id: String, num_turns: u32 .collect::>(); sess.apply_rollout_reconstruction(turn_context.as_ref(), replay_items.as_slice()) .await; + sess.services + .agent_control + .rollout_budget() + .rearm_reminder(sess.thread_id()); sess.recompute_token_usage(turn_context.as_ref()).await; sess.persist_rollout_items(&[RolloutItem::EventMsg(rollback_msg.clone())]) diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 84e60a3d2..5aa56b68f 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -211,6 +211,7 @@ mod input_queue; mod mcp; mod multi_agents; mod review; +mod rollout_budget; mod rollout_reconstruction; #[allow(clippy::module_inception)] pub(crate) mod session; @@ -3360,6 +3361,7 @@ impl Session { } state.token_info() }; + self.record_rollout_budget_usage(token_usage); if let Some(token_info) = token_info.as_ref() { for contributor in self.services.extensions.token_usage_contributors() { contributor diff --git a/codex-rs/core/src/session/rollout_budget.rs b/codex-rs/core/src/session/rollout_budget.rs new file mode 100644 index 000000000..4627d95ee --- /dev/null +++ b/codex-rs/core/src/session/rollout_budget.rs @@ -0,0 +1,30 @@ +use super::session::Session; +use super::turn_context::TurnContext; +use crate::context::ContextualUserFragment; +use codex_protocol::protocol::TokenUsage; + +pub(super) async fn maybe_record_reminder( + sess: &Session, + turn_context: &TurnContext, + window_id: &str, +) { + let budget = sess.services.agent_control.rollout_budget(); + let Some(reminder) = budget.pending_reminder(sess.thread_id(), window_id) else { + return; + }; + let response_item = ContextualUserFragment::into(crate::context::RolloutBudgetContext { + remaining_tokens: reminder.remaining_tokens, + }); + sess.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) + .await; + budget.mark_reminder_delivered(sess.thread_id(), window_id, reminder); +} + +impl Session { + pub(crate) fn record_rollout_budget_usage(&self, usage: &TokenUsage) { + self.services + .agent_control + .rollout_budget() + .record_usage(usage); + } +} diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 257e74120..657e00ff3 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -218,6 +218,14 @@ pub(crate) async fn run_turn( break; } + let window_id = sess.current_window_id().await; + super::rollout_budget::maybe_record_reminder( + sess.as_ref(), + turn_context.as_ref(), + &window_id, + ) + .await; + // Construct the input that we will send to the model. let sampling_request_input: Vec = async { sess.clone_history() @@ -227,7 +235,6 @@ pub(crate) async fn run_turn( .instrument(trace_span!("run_turn.prepare_sampling_request_input")) .await; - let window_id = sess.current_window_id().await; let responses_metadata = turn_context.turn_metadata_state.to_responses_metadata( sess.installation_id.clone(), window_id, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 63335c012..b452dc45a 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -621,6 +621,7 @@ impl ThreadManager { options: StartThreadOptions, forked_from_thread_id: Option, ) -> CodexResult { + let agent_control = self.agent_control_for_config(&options.config); let (resumed_session_source, resumed_thread_source) = options .initial_history .get_resumed_session_sources() @@ -631,7 +632,7 @@ impl ThreadManager { options.config, options.initial_history, Arc::clone(&self.state.auth_manager), - self.agent_control(), + agent_control, session_source, /*parent_thread_id*/ None, forked_from_thread_id, @@ -710,6 +711,7 @@ impl ThreadManager { auth_manager: Arc, parent_trace: Option, ) -> CodexResult { + let agent_control = self.agent_control_for_config(&config); let environments = default_thread_environment_selections( self.state.environment_manager.as_ref(), &config.cwd, @@ -721,7 +723,7 @@ impl ThreadManager { config, initial_history, auth_manager, - self.agent_control(), + agent_control, session_source, /*parent_thread_id*/ None, /*forked_from_thread_id*/ None, @@ -743,6 +745,7 @@ impl ThreadManager { config: Config, user_shell_override: crate::shell::Shell, ) -> CodexResult { + let agent_control = self.agent_control_for_config(&config); let environments = default_thread_environment_selections( self.state.environment_manager.as_ref(), &config.cwd, @@ -751,7 +754,7 @@ impl ThreadManager { config, InitialHistory::New, Arc::clone(&self.state.auth_manager), - self.agent_control(), + agent_control, /*parent_thread_id*/ None, /*forked_from_thread_id*/ None, /*thread_source*/ None, @@ -772,6 +775,7 @@ impl ThreadManager { auth_manager: Arc, user_shell_override: crate::shell::Shell, ) -> CodexResult { + let agent_control = self.agent_control_for_config(&config); let initial_history = self.initial_history_from_rollout_path(rollout_path).await?; let environments = default_thread_environment_selections( self.state.environment_manager.as_ref(), @@ -784,7 +788,7 @@ impl ThreadManager { config, initial_history, auth_manager, - self.agent_control(), + agent_control, session_source, /*parent_thread_id*/ None, /*forked_from_thread_id*/ None, @@ -952,11 +956,12 @@ impl ThreadManager { self.state.environment_manager.as_ref(), &config.cwd, ); + let agent_control = self.agent_control_for_config(&config); Box::pin(self.state.spawn_thread( config, history, Arc::clone(&self.state.auth_manager), - self.agent_control(), + agent_control, /*parent_thread_id*/ None, forked_from_thread_id, thread_source, @@ -971,7 +976,11 @@ impl ThreadManager { } pub(crate) fn agent_control(&self) -> AgentControl { - AgentControl::new(Arc::downgrade(&self.state)) + AgentControl::new(Arc::downgrade(&self.state), /*rollout_budget*/ None) + } + + fn agent_control_for_config(&self, config: &Config) -> AgentControl { + AgentControl::new(Arc::downgrade(&self.state), config.rollout_budget) } #[cfg(test)] diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 275458ffd..0fdd958b3 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -100,6 +100,7 @@ mod resume; mod resume_warning; mod review; mod rmcp_client; +mod rollout_budget; mod rollout_list_find; mod safety_check_downgrade; mod search_tool; diff --git a/codex-rs/core/tests/suite/rollout_budget.rs b/codex-rs/core/tests/suite/rollout_budget.rs new file mode 100644 index 000000000..c2a044eb5 --- /dev/null +++ b/codex-rs/core/tests/suite/rollout_budget.rs @@ -0,0 +1,304 @@ +use anyhow::Result; +use codex_core::config::RolloutBudgetConfig; +use codex_features::Feature; +use codex_model_provider_info::built_in_model_providers; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::Op; +use core_test_support::responses::ResponsesRequest; +use core_test_support::responses::ev_assistant_message; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_completed_with_tokens; +use core_test_support::responses::ev_function_call; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_sse_once_match; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; +use core_test_support::skip_if_no_network; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use pretty_assertions::assert_eq; +use serde_json::json; +use std::time::Duration; +use tokio::time::timeout; + +const ROLLOUT_BUDGET: RolloutBudgetConfig = RolloutBudgetConfig { + limit_tokens: 100, + reminder_interval_tokens: 25, + sampling_token_weight: 1.0, + prefill_token_weight: 1.0, +}; + +fn rollout_budget_texts(request: &ResponsesRequest) -> Vec { + request + .message_input_texts("developer") + .into_iter() + .filter(|text| text.starts_with("")) + .collect() +} + +fn rollout_budget_message(remaining_tokens: i64) -> String { + format!( + "\nYou have {remaining_tokens} weighted tokens left in the shared session token budget.\n" + ) +} + +fn wire_request_contains(request: &wiremock::Request, text: &str) -> bool { + std::str::from_utf8(&request.body).is_ok_and(|body| body.contains(text)) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn adds_weighted_initial_and_periodic_reminders() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + json!({ + "type": "response.completed", + "response": { + "id": "resp-1", + "usage": { + "input_tokens": 60, + "input_tokens_details": { "cached_tokens": 40 }, + "output_tokens": 15, + "output_tokens_details": null, + "total_tokens": 75 + } + } + }), + ]), + sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]), + ], + ) + .await; + let test = test_codex() + .with_config(|config| { + config.rollout_budget = Some(RolloutBudgetConfig { + sampling_token_weight: 2.0, + prefill_token_weight: 0.5, + ..ROLLOUT_BUDGET + }); + }) + .build(&server) + .await?; + + test.submit_turn("first turn").await?; + test.submit_turn("second turn").await?; + + let requests = responses.requests(); + assert_eq!( + rollout_budget_texts(&requests[0]), + vec![rollout_budget_message(/*remaining_tokens*/ 100)] + ); + assert_eq!( + rollout_budget_texts(&requests[1]), + vec![ + rollout_budget_message(/*remaining_tokens*/ 100), + rollout_budget_message(/*remaining_tokens*/ 60), + ] + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn subagent_usage_draws_from_the_shared_budget() -> Result<()> { + skip_if_no_network!(Ok(())); + + const ROOT_PROMPT: &str = "spawn a budget worker"; + const CHILD_PROMPT: &str = "consume child budget"; + const FOLLOW_UP_PROMPT: &str = "report the shared budget"; + const SPAWN_CALL_ID: &str = "spawn-budget-worker"; + + let server = start_mock_server().await; + let spawn_args = json!({ + "message": CHILD_PROMPT, + "task_name": "budget_worker", + }) + .to_string(); + mount_sse_once_match( + &server, + |request: &wiremock::Request| wire_request_contains(request, ROOT_PROMPT), + sse(vec![ + ev_response_created("root-1"), + ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args), + ev_completed_with_tokens("root-1", /*total_tokens*/ 10), + ]), + ) + .await; + mount_sse_once_match( + &server, + |request: &wiremock::Request| wire_request_contains(request, "\"type\":\"agent_message\""), + sse(vec![ + ev_response_created("child-1"), + ev_completed_with_tokens("child-1", /*total_tokens*/ 30), + ]), + ) + .await; + mount_sse_once_match( + &server, + |request: &wiremock::Request| { + wire_request_contains(request, SPAWN_CALL_ID) + && !wire_request_contains(request, "\"type\":\"agent_message\"") + }, + sse(vec![ + ev_response_created("root-2"), + ev_completed_with_tokens("root-2", /*total_tokens*/ 10), + ]), + ) + .await; + let follow_up = mount_sse_once_match( + &server, + |request: &wiremock::Request| wire_request_contains(request, FOLLOW_UP_PROMPT), + sse(vec![ev_response_created("root-3"), ev_completed("root-3")]), + ) + .await; + + let test = test_codex() + .with_config(|config| { + config + .features + .enable(Feature::Collab) + .expect("test config should allow multi-agent tools"); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow multi-agent v2"); + config.rollout_budget = Some(ROLLOUT_BUDGET); + }) + .build(&server) + .await?; + + let mut created_threads = test.thread_manager.subscribe_thread_created(); + test.submit_turn(ROOT_PROMPT).await?; + let child_thread_id = timeout(Duration::from_secs(10), created_threads.recv()).await??; + let child_thread = test.thread_manager.get_thread(child_thread_id).await?; + wait_for_event(child_thread.as_ref(), |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + test.submit_turn(FOLLOW_UP_PROMPT).await?; + + let request = follow_up.single_request(); + assert_eq!( + rollout_budget_texts(&request).last(), + Some(&rollout_budget_message(/*remaining_tokens*/ 50)) + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn restates_the_current_remainder_after_compaction() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_completed_with_tokens("resp-1", /*total_tokens*/ 20), + ]), + sse(vec![ + ev_response_created("resp-compact"), + ev_assistant_message("msg-compact", "compact summary"), + ev_completed_with_tokens("resp-compact", /*total_tokens*/ 10), + ]), + sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]), + ], + ) + .await; + let mut model_provider = built_in_model_providers(/*openai_base_url*/ None)["openai"].clone(); + model_provider.name = "OpenAI-compatible test provider".to_string(); + model_provider.base_url = Some(format!("{}/v1", server.uri())); + model_provider.supports_websockets = false; + let test = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + config.rollout_budget = Some(RolloutBudgetConfig { + reminder_interval_tokens: 50, + ..ROLLOUT_BUDGET + }); + }) + .build(&server) + .await?; + + test.submit_turn("first turn").await?; + test.codex.submit(Op::Compact).await?; + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + test.submit_turn("second turn").await?; + + let requests = responses.requests(); + assert_eq!( + rollout_budget_texts(&requests[2]), + vec![rollout_budget_message(/*remaining_tokens*/ 70)], + "a new context window should restate the current remainder" + ); + let request_body = requests[2].body_json().to_string(); + let summary_position = request_body + .find("compact summary") + .expect("post-compaction request should contain the summary"); + let reminder_position = request_body + .find("You have 70 weighted tokens left in the shared session token budget.") + .expect("post-compaction request should contain the current remainder"); + assert!( + summary_position < reminder_position, + "the current remainder should follow the compaction summary" + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn restates_the_current_remainder_after_rollback() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_completed_with_tokens("resp-1", /*total_tokens*/ 30), + ]), + sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]), + ], + ) + .await; + let test = test_codex() + .with_config(|config| { + config.rollout_budget = Some(RolloutBudgetConfig { + reminder_interval_tokens: 50, + ..ROLLOUT_BUDGET + }); + }) + .build(&server) + .await?; + + test.submit_turn("rolled-back turn").await?; + test.codex + .submit(Op::ThreadRollback { num_turns: 1 }) + .await?; + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::ThreadRolledBack(_)) + }) + .await; + test.submit_turn("turn after rollback").await?; + + let requests = responses.requests(); + assert_eq!( + rollout_budget_texts(&requests[1]), + vec![rollout_budget_message(/*remaining_tokens*/ 70)], + "rollback should rearm the current budget reminder without refunding usage" + ); + + Ok(()) +}