diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 01f0e1415..410c97cbd 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -6,7 +6,9 @@ use crate::agent::role::resolve_role_config; use crate::agent::status::is_final; use crate::codex_thread::ThreadConfigSnapshot; use crate::config::Config; +use crate::config::RolloutBudgetConfig; use crate::environment_selection::TurnEnvironmentSnapshot; +use crate::rollout_budget::RolloutBudget; use crate::session::emit_subagent_session_started; use crate::session_prefix::format_inter_agent_completion_message; use crate::session_prefix::format_subagent_context_line; @@ -100,15 +102,24 @@ pub(crate) struct AgentControl { state: Arc, v2_residency: Arc, agent_execution_limiter: Arc, + /// Session-scoped state shared by the root thread and every cloned sub-agent control handle. + rollout_budget: Arc, } impl AgentControl { /// Construct a new `AgentControl` that can spawn/message agents via the given manager state. - pub(crate) fn new(manager: Weak) -> Self { - Self { + pub(crate) fn new( + manager: Weak, + rollout_budget: Option, + ) -> Self { + let control = Self { manager, ..Default::default() + }; + if let Some(rollout_budget) = rollout_budget { + control.rollout_budget.configure(rollout_budget); } + control } pub(crate) fn with_session_id(mut self, session_id: SessionId, max_threads: usize) -> Self { @@ -121,6 +132,10 @@ impl AgentControl { self.session_id } + pub(crate) fn rollout_budget(&self) -> &RolloutBudget { + self.rollout_budget.as_ref() + } + /// Send rich user input items to an existing agent thread. pub(crate) async fn send_input( &self, diff --git a/codex-rs/core/src/compact_remote_v2.rs b/codex-rs/core/src/compact_remote_v2.rs index 6c3a93902..f9c2ddc71 100644 --- a/codex-rs/core/src/compact_remote_v2.rs +++ b/codex-rs/core/src/compact_remote_v2.rs @@ -284,6 +284,7 @@ async fn run_remote_compact_task_inner_impl( token_usage, } = compaction_output_result?; if let Some(token_usage) = token_usage { + sess.record_rollout_budget_usage(&token_usage); analytics_details.active_context_tokens_before = Some(token_usage.input_tokens); analytics_details.compaction_summary_tokens = Some(token_usage.output_tokens); analytics_details.cached_input_tokens = Some(token_usage.cached_input_tokens); diff --git a/codex-rs/core/src/context/mod.rs b/codex-rs/core/src/context/mod.rs index a1f204e47..7fc137578 100644 --- a/codex-rs/core/src/context/mod.rs +++ b/codex-rs/core/src/context/mod.rs @@ -24,6 +24,7 @@ mod realtime_end_instructions; mod realtime_start_instructions; mod realtime_start_with_instructions; mod recommended_plugins_instructions; +mod rollout_budget; mod subagent_notification; mod token_budget_context; mod turn_aborted; @@ -64,6 +65,7 @@ pub(crate) use realtime_end_instructions::RealtimeEndInstructions; pub(crate) use realtime_start_instructions::RealtimeStartInstructions; pub(crate) use realtime_start_with_instructions::RealtimeStartWithInstructions; pub(crate) use recommended_plugins_instructions::RecommendedPluginsInstructions; +pub(crate) use rollout_budget::RolloutBudgetContext; pub(crate) use subagent_notification::SubagentNotification; pub(crate) use token_budget_context::TokenBudgetContext; pub(crate) use token_budget_context::TokenBudgetRemainingContext; diff --git a/codex-rs/core/src/context/rollout_budget.rs b/codex-rs/core/src/context/rollout_budget.rs new file mode 100644 index 000000000..33ed724b8 --- /dev/null +++ b/codex-rs/core/src/context/rollout_budget.rs @@ -0,0 +1,27 @@ +use super::ContextualUserFragment; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct RolloutBudgetContext { + pub(crate) remaining_tokens: i64, +} + +impl ContextualUserFragment for RolloutBudgetContext { + fn role(&self) -> &'static str { + "developer" + } + + fn markers(&self) -> (&'static str, &'static str) { + Self::type_markers() + } + + fn type_markers() -> (&'static str, &'static str) { + ("\n", "\n") + } + + fn body(&self) -> String { + format!( + "You have {} weighted tokens left in the shared session token budget.", + self.remaining_tokens + ) + } +} diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index 9e96ebff5..73193984b 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -33,6 +33,7 @@ const CONTEXTUAL_DEVELOPER_PREFIXES: &[&str] = &[ SKILLS_INSTRUCTIONS_OPEN_TAG, "", "", + "", ]; pub(crate) fn is_contextual_user_message_content(message: &[ContentItem]) -> bool { diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 0641d7226..0e193baa1 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -131,6 +131,7 @@ pub use agents_md::DEFAULT_AGENTS_MD_FILENAME; pub use agents_md::LOCAL_AGENTS_MD_FILENAME; pub use agents_md::LoadedAgentsMd; mod rollout; +mod rollout_budget; pub(crate) mod safety; mod session_rollout_init_error; pub mod shell; diff --git a/codex-rs/core/src/rollout_budget.rs b/codex-rs/core/src/rollout_budget.rs new file mode 100644 index 000000000..accabac76 --- /dev/null +++ b/codex-rs/core/src/rollout_budget.rs @@ -0,0 +1,108 @@ +use crate::config::RolloutBudgetConfig; +use codex_protocol::ThreadId; +use codex_protocol::protocol::TokenUsage; +use std::collections::HashMap; +use std::sync::Mutex; +use std::sync::MutexGuard; +use std::sync::OnceLock; + +pub(crate) struct RolloutBudgetReminder { + pub(crate) remaining_tokens: i64, + reminder_index: i64, +} + +/// Shared accounting and reminder state for one root-thread session tree. +#[derive(Default)] +pub(crate) struct RolloutBudget { + state: OnceLock>, +} + +struct RolloutBudgetState { + config: RolloutBudgetConfig, + weighted_tokens_used: f64, + /// Last reminder delivered to each thread, so every thread observes crossed thresholds. + deliveries: HashMap, +} + +struct ThreadBudgetDelivery { + window_id: String, + reminder_index: i64, +} + +impl RolloutBudget { + pub(crate) fn configure(&self, config: RolloutBudgetConfig) { + self.state.get_or_init(|| { + Mutex::new(RolloutBudgetState { + config, + weighted_tokens_used: 0.0, + deliveries: HashMap::new(), + }) + }); + } + + pub(crate) fn record_usage(&self, usage: &TokenUsage) { + let Some(mut state) = self.lock() else { + return; + }; + state.weighted_tokens_used += usage.output_tokens.max(0) as f64 + * state.config.sampling_token_weight + + usage.non_cached_input() as f64 * state.config.prefill_token_weight; + } + + pub(crate) fn pending_reminder( + &self, + thread_id: ThreadId, + window_id: &str, + ) -> Option { + let state = self.lock()?; + let reminder_index = (state.weighted_tokens_used + / state.config.reminder_interval_tokens as f64) + .floor() as i64; + if state.deliveries.get(&thread_id).is_some_and(|delivery| { + delivery.window_id.as_str() == window_id && delivery.reminder_index >= reminder_index + }) { + return None; + } + Some(RolloutBudgetReminder { + remaining_tokens: (state.config.limit_tokens as f64 - state.weighted_tokens_used) + .max(0.0) + .floor() as i64, + reminder_index, + }) + } + + pub(crate) fn mark_reminder_delivered( + &self, + thread_id: ThreadId, + window_id: &str, + reminder: RolloutBudgetReminder, + ) { + // Mark delivery only after history insertion; cancellation before then should retry it. + let Some(mut state) = self.lock() else { + return; + }; + state.deliveries.insert( + thread_id, + ThreadBudgetDelivery { + window_id: window_id.to_string(), + reminder_index: reminder.reminder_index, + }, + ); + } + + /// Forces the next sampling request for `thread_id` to restate the current remainder. + pub(crate) fn rearm_reminder(&self, thread_id: ThreadId) { + let Some(mut state) = self.lock() else { + return; + }; + state.deliveries.remove(&thread_id); + } + + fn lock(&self) -> Option> { + self.state.get().map(|state| { + state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + }) + } +} diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index 4ed79d04e..5299747f3 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -525,6 +525,10 @@ pub async fn thread_rollback(sess: &Arc, sub_id: String, num_turns: u32 .collect::>(); sess.apply_rollout_reconstruction(turn_context.as_ref(), replay_items.as_slice()) .await; + sess.services + .agent_control + .rollout_budget() + .rearm_reminder(sess.thread_id()); sess.recompute_token_usage(turn_context.as_ref()).await; sess.persist_rollout_items(&[RolloutItem::EventMsg(rollback_msg.clone())]) diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 84e60a3d2..5aa56b68f 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -211,6 +211,7 @@ mod input_queue; mod mcp; mod multi_agents; mod review; +mod rollout_budget; mod rollout_reconstruction; #[allow(clippy::module_inception)] pub(crate) mod session; @@ -3360,6 +3361,7 @@ impl Session { } state.token_info() }; + self.record_rollout_budget_usage(token_usage); if let Some(token_info) = token_info.as_ref() { for contributor in self.services.extensions.token_usage_contributors() { contributor diff --git a/codex-rs/core/src/session/rollout_budget.rs b/codex-rs/core/src/session/rollout_budget.rs new file mode 100644 index 000000000..4627d95ee --- /dev/null +++ b/codex-rs/core/src/session/rollout_budget.rs @@ -0,0 +1,30 @@ +use super::session::Session; +use super::turn_context::TurnContext; +use crate::context::ContextualUserFragment; +use codex_protocol::protocol::TokenUsage; + +pub(super) async fn maybe_record_reminder( + sess: &Session, + turn_context: &TurnContext, + window_id: &str, +) { + let budget = sess.services.agent_control.rollout_budget(); + let Some(reminder) = budget.pending_reminder(sess.thread_id(), window_id) else { + return; + }; + let response_item = ContextualUserFragment::into(crate::context::RolloutBudgetContext { + remaining_tokens: reminder.remaining_tokens, + }); + sess.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) + .await; + budget.mark_reminder_delivered(sess.thread_id(), window_id, reminder); +} + +impl Session { + pub(crate) fn record_rollout_budget_usage(&self, usage: &TokenUsage) { + self.services + .agent_control + .rollout_budget() + .record_usage(usage); + } +} diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 257e74120..657e00ff3 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -218,6 +218,14 @@ pub(crate) async fn run_turn( break; } + let window_id = sess.current_window_id().await; + super::rollout_budget::maybe_record_reminder( + sess.as_ref(), + turn_context.as_ref(), + &window_id, + ) + .await; + // Construct the input that we will send to the model. let sampling_request_input: Vec = async { sess.clone_history() @@ -227,7 +235,6 @@ pub(crate) async fn run_turn( .instrument(trace_span!("run_turn.prepare_sampling_request_input")) .await; - let window_id = sess.current_window_id().await; let responses_metadata = turn_context.turn_metadata_state.to_responses_metadata( sess.installation_id.clone(), window_id, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 63335c012..b452dc45a 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -621,6 +621,7 @@ impl ThreadManager { options: StartThreadOptions, forked_from_thread_id: Option, ) -> CodexResult { + let agent_control = self.agent_control_for_config(&options.config); let (resumed_session_source, resumed_thread_source) = options .initial_history .get_resumed_session_sources() @@ -631,7 +632,7 @@ impl ThreadManager { options.config, options.initial_history, Arc::clone(&self.state.auth_manager), - self.agent_control(), + agent_control, session_source, /*parent_thread_id*/ None, forked_from_thread_id, @@ -710,6 +711,7 @@ impl ThreadManager { auth_manager: Arc, parent_trace: Option, ) -> CodexResult { + let agent_control = self.agent_control_for_config(&config); let environments = default_thread_environment_selections( self.state.environment_manager.as_ref(), &config.cwd, @@ -721,7 +723,7 @@ impl ThreadManager { config, initial_history, auth_manager, - self.agent_control(), + agent_control, session_source, /*parent_thread_id*/ None, /*forked_from_thread_id*/ None, @@ -743,6 +745,7 @@ impl ThreadManager { config: Config, user_shell_override: crate::shell::Shell, ) -> CodexResult { + let agent_control = self.agent_control_for_config(&config); let environments = default_thread_environment_selections( self.state.environment_manager.as_ref(), &config.cwd, @@ -751,7 +754,7 @@ impl ThreadManager { config, InitialHistory::New, Arc::clone(&self.state.auth_manager), - self.agent_control(), + agent_control, /*parent_thread_id*/ None, /*forked_from_thread_id*/ None, /*thread_source*/ None, @@ -772,6 +775,7 @@ impl ThreadManager { auth_manager: Arc, user_shell_override: crate::shell::Shell, ) -> CodexResult { + let agent_control = self.agent_control_for_config(&config); let initial_history = self.initial_history_from_rollout_path(rollout_path).await?; let environments = default_thread_environment_selections( self.state.environment_manager.as_ref(), @@ -784,7 +788,7 @@ impl ThreadManager { config, initial_history, auth_manager, - self.agent_control(), + agent_control, session_source, /*parent_thread_id*/ None, /*forked_from_thread_id*/ None, @@ -952,11 +956,12 @@ impl ThreadManager { self.state.environment_manager.as_ref(), &config.cwd, ); + let agent_control = self.agent_control_for_config(&config); Box::pin(self.state.spawn_thread( config, history, Arc::clone(&self.state.auth_manager), - self.agent_control(), + agent_control, /*parent_thread_id*/ None, forked_from_thread_id, thread_source, @@ -971,7 +976,11 @@ impl ThreadManager { } pub(crate) fn agent_control(&self) -> AgentControl { - AgentControl::new(Arc::downgrade(&self.state)) + AgentControl::new(Arc::downgrade(&self.state), /*rollout_budget*/ None) + } + + fn agent_control_for_config(&self, config: &Config) -> AgentControl { + AgentControl::new(Arc::downgrade(&self.state), config.rollout_budget) } #[cfg(test)] diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 275458ffd..0fdd958b3 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -100,6 +100,7 @@ mod resume; mod resume_warning; mod review; mod rmcp_client; +mod rollout_budget; mod rollout_list_find; mod safety_check_downgrade; mod search_tool; diff --git a/codex-rs/core/tests/suite/rollout_budget.rs b/codex-rs/core/tests/suite/rollout_budget.rs new file mode 100644 index 000000000..c2a044eb5 --- /dev/null +++ b/codex-rs/core/tests/suite/rollout_budget.rs @@ -0,0 +1,304 @@ +use anyhow::Result; +use codex_core::config::RolloutBudgetConfig; +use codex_features::Feature; +use codex_model_provider_info::built_in_model_providers; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::Op; +use core_test_support::responses::ResponsesRequest; +use core_test_support::responses::ev_assistant_message; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_completed_with_tokens; +use core_test_support::responses::ev_function_call; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_sse_once_match; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; +use core_test_support::skip_if_no_network; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use pretty_assertions::assert_eq; +use serde_json::json; +use std::time::Duration; +use tokio::time::timeout; + +const ROLLOUT_BUDGET: RolloutBudgetConfig = RolloutBudgetConfig { + limit_tokens: 100, + reminder_interval_tokens: 25, + sampling_token_weight: 1.0, + prefill_token_weight: 1.0, +}; + +fn rollout_budget_texts(request: &ResponsesRequest) -> Vec { + request + .message_input_texts("developer") + .into_iter() + .filter(|text| text.starts_with("")) + .collect() +} + +fn rollout_budget_message(remaining_tokens: i64) -> String { + format!( + "\nYou have {remaining_tokens} weighted tokens left in the shared session token budget.\n" + ) +} + +fn wire_request_contains(request: &wiremock::Request, text: &str) -> bool { + std::str::from_utf8(&request.body).is_ok_and(|body| body.contains(text)) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn adds_weighted_initial_and_periodic_reminders() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + json!({ + "type": "response.completed", + "response": { + "id": "resp-1", + "usage": { + "input_tokens": 60, + "input_tokens_details": { "cached_tokens": 40 }, + "output_tokens": 15, + "output_tokens_details": null, + "total_tokens": 75 + } + } + }), + ]), + sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]), + ], + ) + .await; + let test = test_codex() + .with_config(|config| { + config.rollout_budget = Some(RolloutBudgetConfig { + sampling_token_weight: 2.0, + prefill_token_weight: 0.5, + ..ROLLOUT_BUDGET + }); + }) + .build(&server) + .await?; + + test.submit_turn("first turn").await?; + test.submit_turn("second turn").await?; + + let requests = responses.requests(); + assert_eq!( + rollout_budget_texts(&requests[0]), + vec![rollout_budget_message(/*remaining_tokens*/ 100)] + ); + assert_eq!( + rollout_budget_texts(&requests[1]), + vec![ + rollout_budget_message(/*remaining_tokens*/ 100), + rollout_budget_message(/*remaining_tokens*/ 60), + ] + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn subagent_usage_draws_from_the_shared_budget() -> Result<()> { + skip_if_no_network!(Ok(())); + + const ROOT_PROMPT: &str = "spawn a budget worker"; + const CHILD_PROMPT: &str = "consume child budget"; + const FOLLOW_UP_PROMPT: &str = "report the shared budget"; + const SPAWN_CALL_ID: &str = "spawn-budget-worker"; + + let server = start_mock_server().await; + let spawn_args = json!({ + "message": CHILD_PROMPT, + "task_name": "budget_worker", + }) + .to_string(); + mount_sse_once_match( + &server, + |request: &wiremock::Request| wire_request_contains(request, ROOT_PROMPT), + sse(vec![ + ev_response_created("root-1"), + ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args), + ev_completed_with_tokens("root-1", /*total_tokens*/ 10), + ]), + ) + .await; + mount_sse_once_match( + &server, + |request: &wiremock::Request| wire_request_contains(request, "\"type\":\"agent_message\""), + sse(vec![ + ev_response_created("child-1"), + ev_completed_with_tokens("child-1", /*total_tokens*/ 30), + ]), + ) + .await; + mount_sse_once_match( + &server, + |request: &wiremock::Request| { + wire_request_contains(request, SPAWN_CALL_ID) + && !wire_request_contains(request, "\"type\":\"agent_message\"") + }, + sse(vec![ + ev_response_created("root-2"), + ev_completed_with_tokens("root-2", /*total_tokens*/ 10), + ]), + ) + .await; + let follow_up = mount_sse_once_match( + &server, + |request: &wiremock::Request| wire_request_contains(request, FOLLOW_UP_PROMPT), + sse(vec![ev_response_created("root-3"), ev_completed("root-3")]), + ) + .await; + + let test = test_codex() + .with_config(|config| { + config + .features + .enable(Feature::Collab) + .expect("test config should allow multi-agent tools"); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow multi-agent v2"); + config.rollout_budget = Some(ROLLOUT_BUDGET); + }) + .build(&server) + .await?; + + let mut created_threads = test.thread_manager.subscribe_thread_created(); + test.submit_turn(ROOT_PROMPT).await?; + let child_thread_id = timeout(Duration::from_secs(10), created_threads.recv()).await??; + let child_thread = test.thread_manager.get_thread(child_thread_id).await?; + wait_for_event(child_thread.as_ref(), |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + test.submit_turn(FOLLOW_UP_PROMPT).await?; + + let request = follow_up.single_request(); + assert_eq!( + rollout_budget_texts(&request).last(), + Some(&rollout_budget_message(/*remaining_tokens*/ 50)) + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn restates_the_current_remainder_after_compaction() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_completed_with_tokens("resp-1", /*total_tokens*/ 20), + ]), + sse(vec![ + ev_response_created("resp-compact"), + ev_assistant_message("msg-compact", "compact summary"), + ev_completed_with_tokens("resp-compact", /*total_tokens*/ 10), + ]), + sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]), + ], + ) + .await; + let mut model_provider = built_in_model_providers(/*openai_base_url*/ None)["openai"].clone(); + model_provider.name = "OpenAI-compatible test provider".to_string(); + model_provider.base_url = Some(format!("{}/v1", server.uri())); + model_provider.supports_websockets = false; + let test = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + config.rollout_budget = Some(RolloutBudgetConfig { + reminder_interval_tokens: 50, + ..ROLLOUT_BUDGET + }); + }) + .build(&server) + .await?; + + test.submit_turn("first turn").await?; + test.codex.submit(Op::Compact).await?; + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + test.submit_turn("second turn").await?; + + let requests = responses.requests(); + assert_eq!( + rollout_budget_texts(&requests[2]), + vec![rollout_budget_message(/*remaining_tokens*/ 70)], + "a new context window should restate the current remainder" + ); + let request_body = requests[2].body_json().to_string(); + let summary_position = request_body + .find("compact summary") + .expect("post-compaction request should contain the summary"); + let reminder_position = request_body + .find("You have 70 weighted tokens left in the shared session token budget.") + .expect("post-compaction request should contain the current remainder"); + assert!( + summary_position < reminder_position, + "the current remainder should follow the compaction summary" + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn restates_the_current_remainder_after_rollback() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_completed_with_tokens("resp-1", /*total_tokens*/ 30), + ]), + sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]), + ], + ) + .await; + let test = test_codex() + .with_config(|config| { + config.rollout_budget = Some(RolloutBudgetConfig { + reminder_interval_tokens: 50, + ..ROLLOUT_BUDGET + }); + }) + .build(&server) + .await?; + + test.submit_turn("rolled-back turn").await?; + test.codex + .submit(Op::ThreadRollback { num_turns: 1 }) + .await?; + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::ThreadRolledBack(_)) + }) + .await; + test.submit_turn("turn after rollback").await?; + + let requests = responses.requests(); + assert_eq!( + rollout_budget_texts(&requests[1]), + vec![rollout_budget_message(/*remaining_tokens*/ 70)], + "rollback should rearm the current budget reminder without refunding usage" + ); + + Ok(()) +}