diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index f61d22b3c..5fcd8eb07 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -624,6 +624,9 @@ "terminal_visualization_instructions": { "type": "boolean" }, + "token_budget": { + "type": "boolean" + }, "tool_call_mcp_elicitation": { "type": "boolean" }, @@ -4759,6 +4762,9 @@ "terminal_visualization_instructions": { "type": "boolean" }, + "token_budget": { + "type": "boolean" + }, "tool_call_mcp_elicitation": { "type": "boolean" }, diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 96feddf13..f611059f8 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -294,6 +294,7 @@ async fn run_compact_task_inner_impl( let user_messages = collect_user_messages(history_items); let mut new_history = build_compacted_history(Vec::new(), &user_messages, &summary_text); + let window_id = sess.advance_auto_compact_window_id().await; if matches!( initial_context_injection, @@ -310,7 +311,7 @@ async fn run_compact_task_inner_impl( let compacted_item = CompactedItem { message: summary_text.clone(), replacement_history: Some(new_history.clone()), - window_id: None, + window_id: Some(window_id), }; sess.replace_compacted_history(new_history, reference_context_item, compacted_item) .await; diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 767fdf7b0..866756f65 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -249,6 +249,7 @@ async fn run_remote_compact_task_inner_impl( turn_metadata_header.as_deref(), ) .await?; + let new_window_id = sess.advance_auto_compact_window_id().await; new_history = process_compacted_history( sess.as_ref(), turn_context.as_ref(), @@ -264,7 +265,7 @@ async fn run_remote_compact_task_inner_impl( let compacted_item = CompactedItem { message: String::new(), replacement_history: Some(new_history.clone()), - window_id: None, + window_id: Some(new_window_id), }; // Install is the semantic boundary where the compact endpoint's output becomes live // thread history. Keep it distinct from the later inference request so the reducer can diff --git a/codex-rs/core/src/compact_remote_v2.rs b/codex-rs/core/src/compact_remote_v2.rs index 9a14dcd96..56056130c 100644 --- a/codex-rs/core/src/compact_remote_v2.rs +++ b/codex-rs/core/src/compact_remote_v2.rs @@ -285,6 +285,7 @@ async fn run_remote_compact_task_inner_impl( let (compacted_history, retained_images) = build_v2_compacted_history(&prompt_input, compaction_output); analytics_details.retained_image_count = Some(retained_images); + let new_window_id = sess.advance_auto_compact_window_id().await; let new_history = process_compacted_history( sess.as_ref(), turn_context.as_ref(), @@ -300,7 +301,7 @@ async fn run_remote_compact_task_inner_impl( let compacted_item = CompactedItem { message: String::new(), replacement_history: Some(new_history.clone()), - window_id: None, + window_id: Some(new_window_id), }; compaction_trace.record_installed(&CompactionCheckpointTracePayload { input_history: &trace_input_history, diff --git a/codex-rs/core/src/context/mod.rs b/codex-rs/core/src/context/mod.rs index 78fc70fd4..d59d6e870 100644 --- a/codex-rs/core/src/context/mod.rs +++ b/codex-rs/core/src/context/mod.rs @@ -23,6 +23,7 @@ mod realtime_end_instructions; mod realtime_start_instructions; mod realtime_start_with_instructions; mod subagent_notification; +mod token_budget_context; mod turn_aborted; mod user_instructions; mod user_shell_command; @@ -60,6 +61,8 @@ pub(crate) use realtime_end_instructions::RealtimeEndInstructions; pub(crate) use realtime_start_instructions::RealtimeStartInstructions; pub(crate) use realtime_start_with_instructions::RealtimeStartWithInstructions; pub(crate) use subagent_notification::SubagentNotification; +pub(crate) use token_budget_context::TokenBudgetContext; +pub(crate) use token_budget_context::TokenBudgetRemainingContext; pub(crate) use turn_aborted::TurnAborted; pub(crate) use user_instructions::UserInstructions; pub(crate) use user_shell_command::UserShellCommand; diff --git a/codex-rs/core/src/context/token_budget_context.rs b/codex-rs/core/src/context/token_budget_context.rs new file mode 100644 index 000000000..d13c40448 --- /dev/null +++ b/codex-rs/core/src/context/token_budget_context.rs @@ -0,0 +1,68 @@ +use super::ContextualUserFragment; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct TokenBudgetContext { + window_id: u64, + tokens_left: i64, +} + +impl TokenBudgetContext { + pub(crate) fn new(window_id: u64, tokens_left: i64) -> Self { + Self { + window_id, + tokens_left, + } + } +} + +impl ContextualUserFragment for TokenBudgetContext { + fn role(&self) -> &'static str { + "developer" + } + + fn markers(&self) -> (&'static str, &'static str) { + Self::type_markers() + } + + fn type_markers() -> (&'static str, &'static str) { + ("\n", "\n") + } + + fn body(&self) -> String { + let window_id = self.window_id; + let tokens_left = self.tokens_left; + format!( + "Current context window {window_id}.\nYou have {tokens_left} tokens left in this context window." + ) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct TokenBudgetRemainingContext { + tokens_left: i64, +} + +impl TokenBudgetRemainingContext { + pub(crate) fn new(tokens_left: i64) -> Self { + Self { tokens_left } + } +} + +impl ContextualUserFragment for TokenBudgetRemainingContext { + fn role(&self) -> &'static str { + "developer" + } + + fn markers(&self) -> (&'static str, &'static str) { + Self::type_markers() + } + + fn type_markers() -> (&'static str, &'static str) { + ("\n", "\n") + } + + fn body(&self) -> String { + let tokens_left = self.tokens_left; + format!("You have {tokens_left} tokens left in this context window.") + } +} diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index d3cae7fee..bdcd78ad6 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -30,6 +30,7 @@ const CONTEXTUAL_DEVELOPER_PREFIXES: &[&str] = &[ COLLABORATION_MODE_OPEN_TAG, REALTIME_CONVERSATION_OPEN_TAG, "", + "", ]; pub(crate) fn is_contextual_user_message_content(message: &[ContentItem]) -> bool { diff --git a/codex-rs/core/src/event_mapping_tests.rs b/codex-rs/core/src/event_mapping_tests.rs index 5f2ed6683..ab8cbdd4a 100644 --- a/codex-rs/core/src/event_mapping_tests.rs +++ b/codex-rs/core/src/event_mapping_tests.rs @@ -1,3 +1,5 @@ +use super::has_non_contextual_dev_message_content; +use super::is_contextual_dev_message_content; use super::parse_turn_item; use crate::context::ContextualUserFragment; use crate::context::InternalContextSource; @@ -16,6 +18,17 @@ use codex_protocol::models::WebSearchAction; use codex_protocol::user_input::UserInput; use pretty_assertions::assert_eq; +#[test] +fn recognizes_token_budget_as_contextual_developer_content() { + let content = vec![ContentItem::InputText { + text: "\nYou have 710 tokens left in this context window.\n" + .to_string(), + }]; + + assert!(is_contextual_dev_message_content(&content)); + assert!(!has_non_contextual_dev_message_content(&content)); +} + #[test] fn parses_user_message_with_text_and_two_images() { let img1 = "https://example.com/one.png".to_string(); diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 68d2ceaeb..0c9dfeccb 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -207,6 +207,7 @@ mod review; mod rollout_reconstruction; #[allow(clippy::module_inception)] pub(crate) mod session; +mod token_budget; pub(crate) mod turn; pub(crate) mod turn_context; use self::config_lock::export_config_lock_if_configured; @@ -2712,15 +2713,13 @@ impl Session { &self, items: Vec, reference_context_item: Option, - mut compacted_item: CompactedItem, + compacted_item: CompactedItem, ) { { let mut state = self.state.lock().await; state.replace_history(items, reference_context_item.clone()); } - compacted_item.window_id = Some(self.advance_auto_compact_window_id().await); - self.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)]) .await; if let Some(turn_context_item) = reference_context_item { @@ -2805,6 +2804,7 @@ impl Session { collaboration_mode, base_instructions, session_source, + auto_compact_window_id, ) = { let state = self.state.lock().await; ( @@ -2813,6 +2813,7 @@ impl Session { state.session_configuration.collaboration_mode.clone(), state.session_configuration.base_instructions.clone(), state.session_configuration.session_source.clone(), + state.auto_compact_window_id(), ) }; if let Some(model_switch_message) = @@ -2962,6 +2963,18 @@ impl Session { .render(), ); } + // This is full-context metadata. Steady-state context diffs should not re-emit it. + if turn_context.features.enabled(Feature::TokenBudget) + && let Some(model_context_window) = turn_context.model_context_window() + { + developer_sections.push( + crate::context::TokenBudgetContext::new( + auto_compact_window_id, + model_context_window, + ) + .render(), + ); + } if turn_context.config.include_environment_context { let shell = self.user_shell(); let subagents = self @@ -3040,7 +3053,7 @@ impl Session { format!("{thread_id}:{window_id}") } - async fn advance_auto_compact_window_id(&self) -> u64 { + pub(crate) async fn advance_auto_compact_window_id(&self) -> u64 { let mut state = self.state.lock().await; state.advance_auto_compact_window_id() } diff --git a/codex-rs/core/src/session/token_budget.rs b/codex-rs/core/src/session/token_budget.rs new file mode 100644 index 000000000..b8f9f82fb --- /dev/null +++ b/codex-rs/core/src/session/token_budget.rs @@ -0,0 +1,44 @@ +use super::session::Session; +use super::turn_context::TurnContext; +use crate::context::ContextualUserFragment; +use codex_features::Feature; + +const TOKEN_BUDGET_USAGE_THRESHOLDS: [i64; 3] = [25, 50, 75]; + +pub(super) async fn maybe_record_token_budget_remaining_context( + sess: &Session, + turn_context: &TurnContext, + tokens_before_sampling: i64, + tokens_after_sampling: i64, +) { + if !turn_context.features.enabled(Feature::TokenBudget) { + return; + } + let Some(model_context_window) = turn_context.model_context_window() else { + return; + }; + if model_context_window <= 0 || tokens_after_sampling <= tokens_before_sampling { + return; + } + + let tokens_before_sampling = tokens_before_sampling.max(0); + let tokens_after_sampling = tokens_after_sampling.max(0); + let crossed_threshold = TOKEN_BUDGET_USAGE_THRESHOLDS.iter().any(|threshold| { + tokens_before_sampling.saturating_mul(100) < model_context_window.saturating_mul(*threshold) + && tokens_after_sampling.saturating_mul(100) + >= model_context_window.saturating_mul(*threshold) + }); + if !crossed_threshold { + return; + } + + let tokens_left = model_context_window + .saturating_sub(tokens_after_sampling) + .max(0); + + let response_item = ContextualUserFragment::into( + crate::context::TokenBudgetRemainingContext::new(tokens_left), + ); + sess.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) + .await; +} diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index f65ada611..85200d740 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -225,6 +225,7 @@ pub(crate) async fn run_turn( let turn_metadata_header = turn_context .turn_metadata_state .current_header_value_for_model_request(&window_id); + let tokens_before_sampling = sess.get_total_token_usage().await; match run_sampling_request( Arc::clone(&sess), Arc::clone(&turn_context), @@ -275,6 +276,15 @@ pub(crate) async fn run_turn( "post sampling token usage" ); + let tokens_after_sampling = token_status.active_context_tokens; + super::token_budget::maybe_record_token_budget_remaining_context( + sess.as_ref(), + turn_context.as_ref(), + tokens_before_sampling, + tokens_after_sampling, + ) + .await; + // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. if token_limit_reached && needs_follow_up { if let Err(err) = run_auto_compact( diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 234fb8a2f..5764068bd 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -110,6 +110,7 @@ mod sqlite_state; mod stream_error_allows_next_turn; mod stream_no_completed; mod subagent_notifications; +mod token_budget; mod tool_harness; mod tool_parallelism; mod tools; diff --git a/codex-rs/core/tests/suite/token_budget.rs b/codex-rs/core/tests/suite/token_budget.rs new file mode 100644 index 000000000..92c2fa929 --- /dev/null +++ b/codex-rs/core/tests/suite/token_budget.rs @@ -0,0 +1,222 @@ +use anyhow::Result; +use codex_features::Feature; +use codex_model_provider_info::built_in_model_providers; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::Op; +use core_test_support::PathBufExt; +use core_test_support::responses::ResponsesRequest; +use core_test_support::responses::ev_assistant_message; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_completed_with_tokens; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; +use core_test_support::skip_if_no_network; +use core_test_support::test_codex::local; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use pretty_assertions::assert_eq; + +const CONFIGURED_CONTEXT_WINDOW: i64 = 128_000; +const EFFECTIVE_CONTEXT_WINDOW: i64 = CONFIGURED_CONTEXT_WINDOW * 95 / 100; + +fn token_budget_texts(request: &ResponsesRequest) -> Vec { + request + .message_input_texts("developer") + .into_iter() + .filter(|text| text.starts_with("")) + .collect() +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn token_budget_context_is_only_emitted_with_full_context() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), + sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]), + ], + ) + .await; + let test = test_codex() + .with_config(|config| { + config.model_context_window = Some(CONFIGURED_CONTEXT_WINDOW); + config + .features + .enable(Feature::TokenBudget) + .expect("test config should allow token budget"); + }) + .build(&server) + .await?; + + test.submit_turn("first turn").await?; + + let second_cwd = test.workspace_path("second-cwd"); + std::fs::create_dir_all(&second_cwd)?; + test.submit_turn_with_environments("second turn", Some(vec![local(second_cwd.abs())])) + .await?; + + let requests = responses.requests(); + assert_eq!(requests.len(), 2); + + let expected = vec![format!( + "\nCurrent context window 0.\nYou have {EFFECTIVE_CONTEXT_WINDOW} tokens left in this context window.\n" + )]; + assert_eq!( + token_budget_texts(&requests[0]), + expected, + "initial full context should report context window 0" + ); + assert_eq!( + token_budget_texts(&requests[1]), + expected, + "steady-state context update should not advance the context window" + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn token_budget_remaining_context_emits_on_first_threshold_crossing() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_completed_with_tokens("resp-1", /*total_tokens*/ 2_500), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_completed_with_tokens("resp-2", /*total_tokens*/ 3_000), + ]), + sse(vec![ + ev_response_created("resp-3"), + ev_completed_with_tokens("resp-3", /*total_tokens*/ 5_000), + ]), + sse(vec![ + ev_response_created("resp-4"), + ev_completed_with_tokens("resp-4", /*total_tokens*/ 8_000), + ]), + sse(vec![ev_response_created("resp-5"), ev_completed("resp-5")]), + ], + ) + .await; + let test = test_codex() + .with_config(|config| { + config.model_context_window = Some(10_000); + config + .features + .enable(Feature::TokenBudget) + .expect("test config should allow token budget"); + }) + .build(&server) + .await?; + + for turn in 1..=5 { + test.submit_turn(&format!("turn {turn}")).await?; + } + + let requests = responses.requests(); + assert_eq!(requests.len(), 5); + + let full_context = "\nCurrent context window 0.\nYou have 9500 tokens left in this context window.\n" + .to_string(); + let threshold_25 = + "\nYou have 7000 tokens left in this context window.\n" + .to_string(); + let threshold_50 = + "\nYou have 4500 tokens left in this context window.\n" + .to_string(); + let threshold_75 = + "\nYou have 1500 tokens left in this context window.\n" + .to_string(); + + assert_eq!(token_budget_texts(&requests[0]), vec![full_context.clone()]); + assert_eq!( + token_budget_texts(&requests[1]), + vec![full_context.clone(), threshold_25.clone()] + ); + assert_eq!( + token_budget_texts(&requests[2]), + vec![full_context.clone(), threshold_25.clone()] + ); + assert_eq!( + token_budget_texts(&requests[3]), + vec![ + full_context.clone(), + threshold_25.clone(), + threshold_50.clone() + ] + ); + assert_eq!( + token_budget_texts(&requests[4]), + vec![full_context, threshold_25, threshold_50, threshold_75] + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn token_budget_context_uses_new_window_after_compaction() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), + sse(vec![ + ev_response_created("resp-compact"), + ev_assistant_message("msg-compact", "compact summary"), + ev_completed("resp-compact"), + ]), + sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]), + ], + ) + .await; + + let mut model_provider = built_in_model_providers(/*openai_base_url*/ None)["openai"].clone(); + model_provider.name = "OpenAI-compatible test provider".to_string(); + model_provider.base_url = Some(format!("{}/v1", server.uri())); + model_provider.supports_websockets = false; + + let test = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + config.model_context_window = Some(CONFIGURED_CONTEXT_WINDOW); + config + .features + .enable(Feature::TokenBudget) + .expect("test config should allow token budget"); + }) + .build(&server) + .await?; + + test.submit_turn("before compact").await?; + test.codex.submit(Op::Compact).await?; + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + test.submit_turn("after compact").await?; + + let requests = responses.requests(); + assert_eq!(requests.len(), 3); + + assert_eq!( + token_budget_texts(&requests[2]), + vec![format!( + "\nCurrent context window 1.\nYou have {EFFECTIVE_CONTEXT_WINDOW} tokens left in this context window.\n" + )], + "post-compaction full context should report context window 1" + ); + + Ok(()) +} diff --git a/codex-rs/features/src/lib.rs b/codex-rs/features/src/lib.rs index 3159d50a3..afb5bbac2 100644 --- a/codex-rs/features/src/lib.rs +++ b/codex-rs/features/src/lib.rs @@ -197,6 +197,8 @@ pub enum Feature { GuardianApproval, /// Enable persisted thread goals and automatic goal continuation. Goals, + /// Add current context-window metadata to model-visible context. + TokenBudget, /// Route MCP tool approval prompts through the MCP elicitation request path. ToolCallMcpElicitation, /// Prompt Codex Apps connector auth failures through MCP URL elicitations. @@ -1142,6 +1144,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::Stable, default_enabled: true, }, + FeatureSpec { + id: Feature::TokenBudget, + key: "token_budget", + stage: Stage::UnderDevelopment, + default_enabled: false, + }, FeatureSpec { id: Feature::CollaborationModes, key: "collaboration_modes",