[codex] surface rollout budget exhaustion (#29715)

## Summary
- surface shared rollout-budget exhaustion as
`CodexErr::RolloutBudgetExceeded` instead of a generic interrupted turn
- map it through the existing `CodexErrorInfo` and app-server v2
`codexErrorInfo` path
- keep local compaction from retrying after the shared rollout budget is
exhausted

This gives app-server clients a stable `rolloutBudgetExceeded` error
they can classify without guessing from `status="interrupted"`.

## Tests
- `just test -p codex-core rollout_budget`
This commit is contained in:
rka-oai
2026-06-23 15:01:28 -07:00
committed by GitHub
Unverified
parent be0dfcfbea
commit bbbea91960
26 changed files with 60 additions and 26 deletions
+2
View File
@@ -139,6 +139,7 @@ impl TurnCodexErrorFact {
#[serde(rename_all = "snake_case")]
pub enum CodexErrKind {
TurnAborted,
RolloutBudgetExceeded,
Stream,
ContextWindowExceeded,
ThreadNotFound,
@@ -195,6 +196,7 @@ impl From<&CodexErr> for CodexErrKind {
fn from(error: &CodexErr) -> Self {
match error {
CodexErr::TurnAborted => CodexErrKind::TurnAborted,
CodexErr::RolloutBudgetExceeded => CodexErrKind::RolloutBudgetExceeded,
CodexErr::Stream(..) => CodexErrKind::Stream,
CodexErr::ContextWindowExceeded => CodexErrKind::ContextWindowExceeded,
CodexErr::ThreadNotFound(_) => CodexErrKind::ThreadNotFound,
@@ -573,6 +573,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -7070,6 +7070,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -3310,6 +3310,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -7,6 +7,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -30,6 +30,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -111,6 +111,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -33,6 +33,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -33,6 +33,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -33,6 +33,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -111,6 +111,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -33,6 +33,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -111,6 +111,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -33,6 +33,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -33,6 +33,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -30,6 +30,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -30,6 +30,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -30,6 +30,7 @@
{
"enum": [
"contextWindowExceeded",
"rolloutBudgetExceeded",
"usageLimitExceeded",
"serverOverloaded",
"cyberPolicy",
@@ -9,4 +9,4 @@ import type { NonSteerableTurnKind } from "./NonSteerableTurnKind";
* When an upstream HTTP status is available (for example, from the Responses API or a provider),
* it is forwarded in `httpStatusCode` on the relevant `codexErrorInfo` variant.
*/
export type CodexErrorInfo = "contextWindowExceeded" | "usageLimitExceeded" | "serverOverloaded" | "cyberPolicy" | { "httpConnectionFailed": { httpStatusCode: number | null, } } | { "responseStreamConnectionFailed": { httpStatusCode: number | null, } } | "internalServerError" | "unauthorized" | "badRequest" | "threadRollbackFailed" | "sandboxError" | { "responseStreamDisconnected": { httpStatusCode: number | null, } } | { "responseTooManyFailedAttempts": { httpStatusCode: number | null, } } | { "activeTurnNotSteerable": { turnKind: NonSteerableTurnKind, } } | "other";
export type CodexErrorInfo = "contextWindowExceeded" | "rolloutBudgetExceeded" | "usageLimitExceeded" | "serverOverloaded" | "cyberPolicy" | { "httpConnectionFailed": { httpStatusCode: number | null, } } | { "responseStreamConnectionFailed": { httpStatusCode: number | null, } } | "internalServerError" | "unauthorized" | "badRequest" | "threadRollbackFailed" | "sandboxError" | { "responseStreamDisconnected": { httpStatusCode: number | null, } } | { "responseTooManyFailedAttempts": { httpStatusCode: number | null, } } | { "activeTurnNotSteerable": { turnKind: NonSteerableTurnKind, } } | "other";
@@ -70,6 +70,7 @@ pub enum NonSteerableTurnKind {
#[ts(export_to = "v2/")]
pub enum CodexErrorInfo {
ContextWindowExceeded,
RolloutBudgetExceeded,
UsageLimitExceeded,
ServerOverloaded,
CyberPolicy,
@@ -115,6 +116,7 @@ impl From<CoreCodexErrorInfo> for CodexErrorInfo {
fn from(value: CoreCodexErrorInfo) -> Self {
match value {
CoreCodexErrorInfo::ContextWindowExceeded => CodexErrorInfo::ContextWindowExceeded,
CoreCodexErrorInfo::RolloutBudgetExceeded => CodexErrorInfo::RolloutBudgetExceeded,
CoreCodexErrorInfo::UsageLimitExceeded => CodexErrorInfo::UsageLimitExceeded,
CoreCodexErrorInfo::ServerOverloaded => CodexErrorInfo::ServerOverloaded,
CoreCodexErrorInfo::CyberPolicy => CodexErrorInfo::CyberPolicy,
+1
View File
@@ -1423,6 +1423,7 @@ There are additional item-specific events:
`codexErrorInfo` maps to the `CodexErrorInfo` enum. Common values:
- `ContextWindowExceeded`
- `RolloutBudgetExceeded`
- `UsageLimitExceeded`
- `HttpConnectionFailed { httpStatusCode? }`: upstream HTTP failures including 4xx/5xx
- `ResponseStreamConnectionFailed { httpStatusCode? }`: failure to connect to the response SSE stream
+6
View File
@@ -275,6 +275,12 @@ async fn run_compact_task_inner_impl(
Err(err @ (CodexErr::Interrupted | CodexErr::TurnAborted)) => {
return Err(err);
}
Err(e @ CodexErr::RolloutBudgetExceeded) => {
sess.track_turn_codex_error(turn_context.as_ref(), &e);
let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None));
sess.send_event(&turn_context, event).await;
return Err(e);
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
+1 -1
View File
@@ -30,7 +30,7 @@ impl Session {
.rollout_budget()
.record_usage(usage)
{
return Err(CodexErr::TurnAborted);
return Err(CodexErr::RolloutBudgetExceeded);
}
Ok(())
}
+23 -24
View File
@@ -2,9 +2,9 @@ use anyhow::Result;
use codex_core::config::RolloutBudgetConfig;
use codex_features::Feature;
use codex_model_provider_info::built_in_model_providers;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
@@ -205,7 +205,7 @@ async fn subagent_usage_draws_from_the_shared_budget() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exhausted_budget_aborts_current_and_later_turns() -> Result<()> {
async fn exhausted_budget_fails_current_and_later_turns() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
@@ -248,18 +248,18 @@ async fn exhausted_budget_aborts_current_and_later_turns() -> Result<()> {
})
.await?;
let event = wait_for_event(&test.codex, |event| match event {
EventMsg::TurnAborted(_) => true,
EventMsg::TurnComplete(_) => {
panic!("exhausted budget completed the turn instead of aborting")
}
_ => false,
wait_for_event(&test.codex, |event| {
matches!(
event,
EventMsg::Error(error)
if error.codex_error_info == Some(CodexErrorInfo::RolloutBudgetExceeded)
)
})
.await;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let EventMsg::TurnAborted(abort) = event else {
unreachable!("event filter only accepts TurnAborted")
};
assert_eq!(abort.reason, TurnAbortReason::Interrupted);
}
Ok(())
@@ -268,7 +268,7 @@ async fn exhausted_budget_aborts_current_and_later_turns() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[test_case(false ; "local")]
#[test_case(true ; "remote_v2")]
async fn compaction_budget_exhaustion_aborts_without_error_or_retry(remote_v2: bool) -> Result<()> {
async fn compaction_budget_exhaustion_fails_without_retry(remote_v2: bool) -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
@@ -311,19 +311,18 @@ async fn compaction_budget_exhaustion_aborts_without_error_or_retry(remote_v2: b
.await?;
test.codex.submit(Op::Compact).await?;
let event = wait_for_event(&test.codex, |event| match event {
EventMsg::TurnAborted(_) => true,
EventMsg::Error(error) => panic!("budget exhaustion emitted an error: {}", error.message),
EventMsg::TurnComplete(_) => {
panic!("budget-exhausting compaction completed instead of aborting")
}
_ => false,
wait_for_event(&test.codex, |event| {
matches!(
event,
EventMsg::Error(error)
if error.codex_error_info == Some(CodexErrorInfo::RolloutBudgetExceeded)
)
})
.await;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let EventMsg::TurnAborted(abort) = event else {
unreachable!("event filter only accepts TurnAborted")
};
assert_eq!(abort.reason, TurnAbortReason::Interrupted);
assert_eq!(responses.requests().len(), 1, "compaction should not retry");
Ok(())
+5
View File
@@ -69,6 +69,9 @@ pub enum CodexErr {
#[error("turn aborted. Something went wrong? Hit `/feedback` to report the issue.")]
TurnAborted,
#[error("shared rollout token budget exhausted")]
RolloutBudgetExceeded,
/// Returned by ResponsesClient when the SSE stream disconnects or errors out **after** the HTTP
/// handshake has succeeded but **before** it finished emitting `response.completed`.
///
@@ -173,6 +176,7 @@ impl CodexErr {
pub fn is_retryable(&self) -> bool {
match self {
CodexErr::TurnAborted
| CodexErr::RolloutBudgetExceeded
| CodexErr::Interrupted
| CodexErr::EnvVar(_)
| CodexErr::Fatal(_)
@@ -220,6 +224,7 @@ impl CodexErr {
pub fn to_codex_protocol_error(&self) -> CodexErrorInfo {
match self {
CodexErr::ContextWindowExceeded => CodexErrorInfo::ContextWindowExceeded,
CodexErr::RolloutBudgetExceeded => CodexErrorInfo::RolloutBudgetExceeded,
CodexErr::UsageLimitReached(_)
| CodexErr::QuotaExceeded
| CodexErr::UsageNotIncluded => CodexErrorInfo::UsageLimitExceeded,
+2
View File
@@ -1690,6 +1690,7 @@ pub enum NonSteerableTurnKind {
#[ts(rename_all = "snake_case")]
pub enum CodexErrorInfo {
ContextWindowExceeded,
RolloutBudgetExceeded,
UsageLimitExceeded,
ServerOverloaded,
CyberPolicy,
@@ -1727,6 +1728,7 @@ impl CodexErrorInfo {
match self {
Self::ThreadRollbackFailed | Self::ActiveTurnNotSteerable { .. } => false,
Self::ContextWindowExceeded
| Self::RolloutBudgetExceeded
| Self::UsageLimitExceeded
| Self::ServerOverloaded
| Self::CyberPolicy