[codex] Retry transient Guardian review failures (#27062)

## Background

Codex can use **Auto Review** for permission requests. Instead of asking
the user immediately, Codex starts a separate locked-down reviewer
session called **Guardian**, which returns a structured `allow` or
`deny` assessment.

The Guardian reviewer is itself a Codex session, so its model request
can fail for transient infrastructure reasons such as model overload,
HTTP connection failure, or response-stream disconnect. Today, any such
failure immediately ends the Auto Review attempt and blocks the action.

This PR adds bounded retries for failures that the existing protocol
explicitly identifies as transient.

Linear context:
[CA-539](https://linear.app/openai/issue/CA-539/retry-auto-review-infrastructure-failures-and-fall-back-to-manual)

## What changes

A Guardian review can now make at most **three total attempts**:

1. Run the review normally.
2. Retry after a jittered delay of roughly 180–220 ms if the first
attempt fails with an eligible error.
3. Retry after a jittered delay of roughly 360–440 ms if the second
attempt also fails with an eligible error.

All attempts share the original review deadline. Jitter spreads retries
from concurrent clients to reduce synchronized load during broader
outages. The retries do not reset the user's maximum wait time, and the
backoff waits terminate early if the review is cancelled or the deadline
expires.

Before retrying, the existing Guardian session lifecycle decides whether
the session remains usable. Healthy trunks are reused, broken trunks are
removed by the existing cleanup path, and ephemeral sessions continue to
clean themselves up.

The review still emits one logical lifecycle to clients. Recoverable
intermediate failures do not produce warnings or terminal events.

## Retry policy

### Retried up to twice

- model/server overload
- HTTP connection failure
- response-stream connection failure
- response-stream disconnect
- internal server error
- a final reviewer message that cannot be parsed as the required
Guardian assessment

### Not retried

- bad or invalid requests
- authentication failures
- usage limits
- cyber-policy failures
- errors without a structured category
- a request that already exhausted the lower-level Responses retry
budget
- a completed Guardian turn with no assessment payload
- prompt-construction failures
- Guardian review timeout
- cancellation or abort
- a valid `deny` assessment

The session-error classification uses `ErrorEvent.codex_error_info`; it
does not inspect error-message strings.

## Implementation notes

- `wait_for_guardian_review` preserves the complete `ErrorEvent`,
including structured `codex_error_info`.
- Guardian session failures preserve the original message and optional
structured `CodexErrorInfo`.
- The retry policy classifies the explicitly transient `CodexErrorInfo`
variants; unknown, absent, and deterministic categories are not retried.
- The Guardian session manager receives the caller's deadline rather
than creating a new timeout per attempt.
- Analytics record the final `attempt_count`.
- Retry orchestration does not add a separate session-cleanup protocol;
it relies on the existing trunk and ephemeral lifecycle decisions.

## Automated testing

Focused Guardian coverage verifies:

- every supported transient `CodexErrorInfo` is classified as retryable,
while absent and non-transient categories are not;
- structured transient session failure -> retry -> approval with the
healthy trunk reused;
- two invalid Guardian responses -> third attempt -> approval, with
exactly three requests;
- three invalid responses -> existing fail-closed result, with exactly
three requests and one terminal lifecycle;
- valid denial, missing payload, invalid request, timeout, cancellation,
and prompt/session construction failures are not retried;
- retry eligibility ends after the third attempt;
- retry delays use the shared exponential backoff helper and remain
within the expected jitter bounds;
- cancellation and deadline expiry interrupt the backoff wait;
- healthy trunks are reused across retryable failures;
- broken event streams remove the trunk through the existing lifecycle
cleanup;
- an ephemeral retry does not disturb a concurrent trunk review.

Validation performed:

- `just test -p codex-core guardian_review_
guardian_ephemeral_retry_preserves_parallel_trunk_and_fork_history
run_review_removes_trunk_when_event_stream_is_broken` — **42 passed**;
- `just test -p codex-analytics` — **71 passed**;
- scoped Clippy fixes for `codex-core` and `codex-analytics` passed.

A prior full `codex-core` run had unrelated environment-sensitive
failures outside Guardian coverage.

## Manual QA

The focused integration tests use the local mock Responses server to
inspect exact request counts and emitted lifecycle events. They confirm
that retries are internal, a successful later attempt supplies the final
decision, non-retryable failures issue only one request, and exhausted
retries emit only one terminal result.
This commit is contained in:
kbazzi
2026-06-10 11:46:57 -07:00
committed by GitHub
Unverified
parent 42415443d0
commit ccf1a18518
6 changed files with 660 additions and 40 deletions
@@ -1937,6 +1937,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
decision: GuardianReviewDecision::Denied,
terminal_status: GuardianReviewTerminalStatus::TimedOut,
failure_reason: Some(GuardianReviewFailureReason::Timeout),
attempt_count: 1,
risk_level: None,
user_authorization: None,
outcome: None,
@@ -2008,6 +2009,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
);
assert_eq!(payload[0]["event_params"]["terminal_status"], "timed_out");
assert_eq!(payload[0]["event_params"]["failure_reason"], "timeout");
assert_eq!(payload[0]["event_params"]["attempt_count"], 1);
assert_eq!(payload[0]["event_params"]["review_timeout_ms"], 90_000);
}
+4
View File
@@ -266,6 +266,7 @@ pub struct GuardianReviewEventParams {
pub decision: GuardianReviewDecision,
pub terminal_status: GuardianReviewTerminalStatus,
pub failure_reason: Option<GuardianReviewFailureReason>,
pub attempt_count: i64,
pub risk_level: Option<GuardianRiskLevel>,
pub user_authorization: Option<GuardianUserAuthorization>,
pub outcome: Option<GuardianAssessmentOutcome>,
@@ -338,6 +339,7 @@ impl GuardianReviewTrackContext {
decision: result.decision,
terminal_status: result.terminal_status,
failure_reason: result.failure_reason,
attempt_count: result.attempt_count,
risk_level: result.risk_level,
user_authorization: result.user_authorization,
outcome: result.outcome,
@@ -373,6 +375,7 @@ pub struct GuardianReviewAnalyticsResult {
pub decision: GuardianReviewDecision,
pub terminal_status: GuardianReviewTerminalStatus,
pub failure_reason: Option<GuardianReviewFailureReason>,
pub attempt_count: i64,
pub risk_level: Option<GuardianRiskLevel>,
pub user_authorization: Option<GuardianUserAuthorization>,
pub outcome: Option<GuardianAssessmentOutcome>,
@@ -392,6 +395,7 @@ impl GuardianReviewAnalyticsResult {
decision: GuardianReviewDecision::Denied,
terminal_status: GuardianReviewTerminalStatus::FailedClosed,
failure_reason: None,
attempt_count: 1,
risk_level: None,
user_authorization: None,
outcome: None,
+1 -1
View File
@@ -168,7 +168,7 @@ use prompt::render_guardian_transcript_entries;
#[cfg(test)]
use review::GuardianReviewOutcome;
#[cfg(test)]
use review::run_guardian_review_session as run_guardian_review_session_for_test;
use review::run_guardian_review_session_with_retry as run_guardian_review_session_for_test;
#[cfg(test)]
use review_session::build_guardian_review_session_config as build_guardian_review_session_config_for_test;
+224 -10
View File
@@ -7,6 +7,7 @@ use codex_analytics::GuardianReviewTrackContext;
use codex_analytics::GuardianReviewedAction;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::GuardianAssessmentDecisionSource;
use codex_protocol::protocol::GuardianAssessmentEvent;
@@ -19,11 +20,14 @@ use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::WarningEvent;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::time::Instant;
use tokio::time::sleep_until;
use tokio_util::sync::CancellationToken;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::turn_timing::now_unix_timestamp_ms;
use crate::util::backoff;
use super::AUTO_REVIEW_DENIAL_WINDOW_SIZE;
use super::GUARDIAN_REVIEW_TIMEOUT;
@@ -58,6 +62,8 @@ const GUARDIAN_TIMEOUT_INSTRUCTIONS: &str = concat!(
"You may retry once, or ask the user for guidance or explicit approval.",
);
const GUARDIAN_REVIEW_MAX_ATTEMPTS: i64 = 3;
pub(crate) fn new_guardian_review_id() -> String {
uuid::Uuid::new_v4().to_string()
}
@@ -95,9 +101,16 @@ pub(super) enum GuardianReviewOutcome {
#[derive(Debug)]
pub(super) enum GuardianReviewError {
PromptBuild { message: String },
Session { message: String },
Parse { message: String },
PromptBuild {
message: String,
},
Session {
message: String,
error_info: Option<CodexErrorInfo>,
},
Parse {
message: String,
},
Timeout,
Cancelled,
}
@@ -112,6 +125,14 @@ impl GuardianReviewError {
fn session(err: anyhow::Error) -> Self {
Self::Session {
message: err.to_string(),
error_info: None,
}
}
fn session_with_error_info(err: anyhow::Error, error_info: CodexErrorInfo) -> Self {
Self::Session {
message: err.to_string(),
error_info: Some(error_info),
}
}
@@ -335,13 +356,14 @@ async fn run_guardian_review(
let schema = guardian_output_schema();
let terminal_action = action_summary.clone();
let (outcome, analytics_result) = Box::pin(run_guardian_review_session(
let (outcome, analytics_result) = Box::pin(run_guardian_review_session_with_retry(
session.clone(),
turn.clone(),
request,
retry_reason.clone(),
schema,
external_cancel,
GUARDIAN_REVIEW_MAX_ATTEMPTS,
))
.await;
@@ -464,7 +486,7 @@ async fn run_guardian_review(
| GuardianReviewError::Parse { .. } => {
let message = match &error {
GuardianReviewError::PromptBuild { message }
| GuardianReviewError::Session { message }
| GuardianReviewError::Session { message, .. }
| GuardianReviewError::Parse { message } => message,
GuardianReviewError::Timeout | GuardianReviewError::Cancelled => {
"guardian review failed"
@@ -657,13 +679,14 @@ pub(crate) fn spawn_approval_request_review(
/// context. It may still reuse the parent's managed-network allowlist for
/// read-only checks, but it intentionally runs without inherited exec-policy
/// rules.
pub(super) async fn run_guardian_review_session(
async fn run_guardian_review_session_before_deadline(
session: Arc<Session>,
turn: Arc<TurnContext>,
request: GuardianApprovalRequest,
retry_reason: Option<String>,
schema: serde_json::Value,
external_cancel: Option<CancellationToken>,
deadline: Instant,
) -> (GuardianReviewOutcome, GuardianReviewAnalyticsResult) {
let network_proxy = session.services.network_proxy.load_full();
let live_network_config = match network_proxy.as_ref() {
@@ -753,6 +776,7 @@ pub(super) async fn run_guardian_review_session(
reasoning_summary: turn.reasoning_summary,
personality: turn.personality,
external_cancel,
deadline,
}),
)
.await;
@@ -787,10 +811,16 @@ pub(super) async fn run_guardian_review_session(
GuardianReviewOutcome::Error(GuardianReviewError::prompt_build(err)),
session_analytics_result,
),
GuardianReviewSessionOutcome::SessionFailed(err) => (
GuardianReviewOutcome::Error(GuardianReviewError::session(err)),
session_analytics_result,
),
GuardianReviewSessionOutcome::SessionFailed { error, error_info } => {
let error = match error_info {
Some(error_info) => GuardianReviewError::session_with_error_info(error, error_info),
None => GuardianReviewError::session(error),
};
(
GuardianReviewOutcome::Error(error),
session_analytics_result,
)
}
GuardianReviewSessionOutcome::TimedOut => (
GuardianReviewOutcome::Error(GuardianReviewError::Timeout),
session_analytics_result,
@@ -802,9 +832,85 @@ pub(super) async fn run_guardian_review_session(
}
}
pub(super) async fn run_guardian_review_session_with_retry(
session: Arc<Session>,
turn: Arc<TurnContext>,
request: GuardianApprovalRequest,
retry_reason: Option<String>,
schema: serde_json::Value,
external_cancel: Option<CancellationToken>,
max_attempts: i64,
) -> (GuardianReviewOutcome, GuardianReviewAnalyticsResult) {
assert!(max_attempts > 0, "guardian review must run at least once");
let deadline = Instant::now() + GUARDIAN_REVIEW_TIMEOUT;
let mut attempt_count = 1;
loop {
let (outcome, mut analytics_result) = run_guardian_review_session_before_deadline(
Arc::clone(&session),
Arc::clone(&turn),
request.clone(),
retry_reason.clone(),
schema.clone(),
external_cancel.clone(),
deadline,
)
.await;
analytics_result.attempt_count = attempt_count;
if attempt_count >= max_attempts || !should_retry_guardian_review(&outcome) {
return (outcome, analytics_result);
}
if let Some(error) =
wait_before_guardian_retry(attempt_count, deadline, external_cancel.as_ref()).await
{
return (GuardianReviewOutcome::Error(error), analytics_result);
}
attempt_count += 1;
}
}
async fn wait_before_guardian_retry(
attempt_count: i64,
deadline: Instant,
external_cancel: Option<&CancellationToken>,
) -> Option<GuardianReviewError> {
let retry_delay = backoff(attempt_count as u64);
let retry_at = (Instant::now() + retry_delay).min(deadline);
tokio::select! {
_ = sleep_until(retry_at) => {
(Instant::now() >= deadline).then_some(GuardianReviewError::Timeout)
}
_ = async {
if let Some(cancel_token) = external_cancel {
cancel_token.cancelled().await;
} else {
std::future::pending::<()>().await;
}
} => Some(GuardianReviewError::Cancelled),
}
}
fn should_retry_guardian_review(outcome: &GuardianReviewOutcome) -> bool {
matches!(
outcome,
GuardianReviewOutcome::Error(
GuardianReviewError::Session {
error_info: Some(
CodexErrorInfo::ServerOverloaded
| CodexErrorInfo::HttpConnectionFailed { .. }
| CodexErrorInfo::ResponseStreamConnectionFailed { .. }
| CodexErrorInfo::InternalServerError
| CodexErrorInfo::ResponseStreamDisconnected { .. }
),
..
} | GuardianReviewError::Parse { .. }
)
)
}
#[cfg(test)]
mod review_tests {
use super::*;
use std::time::Duration;
#[test]
fn guardian_review_error_reason_distinguishes_error_kinds() {
@@ -812,6 +918,10 @@ mod review_tests {
let prompt_error = GuardianReviewError::prompt_build(anyhow::anyhow!("bad prompt/config"));
let session_error =
GuardianReviewError::session(anyhow::anyhow!("guardian runtime failed"));
let structured_session_error = GuardianReviewError::session_with_error_info(
anyhow::anyhow!("temporary guardian failure"),
CodexErrorInfo::ServerOverloaded,
);
assert!(matches!(
parse_error.failure_reason(),
@@ -825,5 +935,109 @@ mod review_tests {
session_error.failure_reason(),
GuardianReviewFailureReason::SessionError
));
assert!(matches!(
structured_session_error.failure_reason(),
GuardianReviewFailureReason::SessionError
));
}
#[test]
fn guardian_review_retry_only_retries_transient_session_and_parse_errors() {
let assessment = GuardianAssessment {
risk_level: GuardianRiskLevel::High,
user_authorization: GuardianUserAuthorization::Unknown,
outcome: GuardianAssessmentOutcome::Deny,
rationale: "deny".to_string(),
};
let transient_error_info = [
CodexErrorInfo::ServerOverloaded,
CodexErrorInfo::HttpConnectionFailed {
http_status_code: Some(502),
},
CodexErrorInfo::ResponseStreamConnectionFailed {
http_status_code: Some(503),
},
CodexErrorInfo::InternalServerError,
CodexErrorInfo::ResponseStreamDisconnected {
http_status_code: None,
},
];
let mut outcomes = transient_error_info
.into_iter()
.map(|error_info| {
(
GuardianReviewOutcome::Error(GuardianReviewError::session_with_error_info(
anyhow::anyhow!("transient session"),
error_info,
)),
true,
)
})
.collect::<Vec<_>>();
outcomes.extend([
(GuardianReviewOutcome::Completed(assessment), false),
(
GuardianReviewOutcome::Error(GuardianReviewError::prompt_build(anyhow::anyhow!(
"prompt"
))),
false,
),
(
GuardianReviewOutcome::Error(GuardianReviewError::session(anyhow::anyhow!(
"session"
))),
false,
),
(
GuardianReviewOutcome::Error(GuardianReviewError::session_with_error_info(
anyhow::anyhow!("bad request"),
CodexErrorInfo::BadRequest,
)),
false,
),
(
GuardianReviewOutcome::Error(GuardianReviewError::parse(anyhow::anyhow!("parse"))),
true,
),
(
GuardianReviewOutcome::Error(GuardianReviewError::Timeout),
false,
),
(
GuardianReviewOutcome::Error(GuardianReviewError::Cancelled),
false,
),
]);
for (outcome, expected) in outcomes {
assert_eq!(should_retry_guardian_review(&outcome), expected);
}
}
#[tokio::test]
async fn guardian_review_retry_wait_honors_cancellation() {
let cancel_token = CancellationToken::new();
cancel_token.cancel();
let error = wait_before_guardian_retry(
/*attempt_count*/ 1,
Instant::now() + Duration::from_secs(/*secs*/ 1),
Some(&cancel_token),
)
.await;
assert!(matches!(error, Some(GuardianReviewError::Cancelled)));
}
#[tokio::test]
async fn guardian_review_retry_wait_honors_deadline() {
let error = wait_before_guardian_retry(
/*attempt_count*/ 1,
Instant::now(),
/*external_cancel*/ None,
)
.await;
assert!(matches!(error, Some(GuardianReviewError::Timeout)));
}
}
+92 -10
View File
@@ -15,6 +15,8 @@ use codex_protocol::models::PermissionProfile;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InitialHistory;
@@ -46,7 +48,6 @@ use codex_features::Feature;
use codex_model_provider_info::ModelProviderInfo;
use codex_utils_absolute_path::AbsolutePathBuf;
use super::GUARDIAN_REVIEW_TIMEOUT;
use super::GUARDIAN_REVIEWER_NAME;
use super::GuardianApprovalRequest;
use super::prompt::GuardianPromptMode;
@@ -60,7 +61,10 @@ const GUARDIAN_INTERRUPT_DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) enum GuardianReviewSessionOutcome {
Completed(anyhow::Result<Option<String>>),
PromptBuildFailed(anyhow::Error),
SessionFailed(anyhow::Error),
SessionFailed {
error: anyhow::Error,
error_info: Option<CodexErrorInfo>,
},
TimedOut,
Aborted,
}
@@ -77,6 +81,7 @@ pub(crate) struct GuardianReviewSessionParams {
pub(crate) reasoning_summary: ReasoningSummaryConfig,
pub(crate) personality: Option<Personality>,
pub(crate) external_cancel: Option<CancellationToken>,
pub(crate) deadline: tokio::time::Instant,
}
#[derive(Default)]
@@ -312,7 +317,7 @@ impl GuardianReviewSessionManager {
&self,
params: GuardianReviewSessionParams,
) -> (GuardianReviewSessionOutcome, GuardianReviewAnalyticsResult) {
let deadline = tokio::time::Instant::now() + GUARDIAN_REVIEW_TIMEOUT;
let deadline = params.deadline;
let next_reuse_key = GuardianReviewSessionReuseKey::from_spawn_config(&params.spawn_config);
let mut stale_trunk_to_shutdown = None;
let mut spawned_trunk = false;
@@ -364,7 +369,9 @@ impl GuardianReviewSessionManager {
state.trunk.as_ref().cloned()
}
Err(outcome) => return (outcome, GuardianReviewAnalyticsResult::without_session()),
Err(outcome) => {
return (outcome, GuardianReviewAnalyticsResult::without_session());
}
};
if let Some(review_session) = stale_trunk_to_shutdown {
@@ -560,7 +567,9 @@ impl GuardianReviewSessionManager {
GuardianReviewAnalyticsResult::without_session(),
);
}
Err(outcome) => return (outcome, GuardianReviewAnalyticsResult::without_session()),
Err(outcome) => {
return (outcome, GuardianReviewAnalyticsResult::without_session());
}
};
self.register_active_ephemeral(Arc::clone(&review_session))
.await;
@@ -764,7 +773,10 @@ async fn run_review_on_session(
Ok(Ok(child_turn_id)) => child_turn_id,
Ok(Err(err)) => {
return (
GuardianReviewSessionOutcome::SessionFailed(err.into()),
GuardianReviewSessionOutcome::SessionFailed {
error: err.into(),
error_info: None,
},
false,
analytics_result,
);
@@ -825,7 +837,7 @@ async fn wait_for_guardian_review(
) -> (GuardianReviewSessionOutcome, bool, bool) {
let timeout = tokio::time::sleep_until(deadline);
tokio::pin!(timeout);
let mut last_error_message: Option<String> = None;
let mut last_error: Option<ErrorEvent> = None;
loop {
tokio::select! {
@@ -862,10 +874,13 @@ async fn wait_for_guardian_review(
.time_to_first_token_ms
.and_then(|ms| u64::try_from(ms).ok());
if turn_complete.last_agent_message.is_none()
&& let Some(error_message) = last_error_message
&& let Some(error) = last_error
{
return (
GuardianReviewSessionOutcome::Completed(Err(anyhow!(error_message))),
GuardianReviewSessionOutcome::SessionFailed {
error: anyhow!(error.message),
error_info: error.codex_error_info,
},
true,
true,
);
@@ -877,7 +892,7 @@ async fn wait_for_guardian_review(
);
}
EventMsg::Error(error) => {
last_error_message = Some(error.message);
last_error = Some(error);
}
EventMsg::TurnAborted(_) => {
return (GuardianReviewSessionOutcome::Aborted, true, false);
@@ -920,6 +935,8 @@ pub(crate) fn build_guardian_review_session_config(
let mut guardian_config = parent_config.clone();
guardian_config.model = Some(active_model.to_string());
guardian_config.model_reasoning_effort = reasoning_effort;
guardian_config.model_provider.request_max_retries = Some(1);
guardian_config.model_provider.stream_max_retries = Some(1);
guardian_config.include_skill_instructions = false;
guardian_config.base_instructions = Some(
parent_config
@@ -1148,6 +1165,7 @@ mod tests {
reasoning_summary,
personality,
external_cancel: None,
deadline: tokio::time::Instant::now() + Duration::from_secs(30),
}
}
@@ -1473,6 +1491,29 @@ mod tests {
assert!(keep_review_session);
}
#[tokio::test]
async fn run_review_removes_trunk_when_event_stream_is_broken() {
let (mut review_session, tx_event, _rx_sub) = test_review_session().await;
let params = test_review_params().await;
review_session.reuse_key =
GuardianReviewSessionReuseKey::from_spawn_config(&params.spawn_config);
let manager = GuardianReviewSessionManager {
state: Arc::new(Mutex::new(GuardianReviewSessionState {
trunk: Some(Arc::new(review_session)),
ephemeral_reviews: Vec::new(),
})),
};
drop(tx_event);
let (outcome, _) = manager.run_review(params).await;
assert!(matches!(
outcome,
GuardianReviewSessionOutcome::Completed(Err(_))
));
assert!(manager.state.lock().await.trunk.is_none());
}
#[tokio::test]
async fn wait_for_guardian_review_ignores_prior_turn_completion() {
let (review_session, tx_event, _rx_sub) = test_review_session().await;
@@ -1545,6 +1586,47 @@ mod tests {
assert!(capture_token_usage);
}
#[tokio::test]
async fn wait_for_guardian_review_preserves_structured_session_error() {
let (review_session, tx_event, _rx_sub) = test_review_session().await;
tx_event
.send(Event {
id: "current-turn".to_string(),
msg: EventMsg::Error(ErrorEvent {
message: "temporary failure".to_string(),
codex_error_info: Some(CodexErrorInfo::ServerOverloaded),
}),
})
.await
.expect("queue guardian error");
tx_event
.send(turn_complete_event(
"current-turn",
/*last_agent_message*/ None,
Some(42),
))
.await
.expect("queue current turn completion");
let mut analytics_result = GuardianReviewAnalyticsResult::without_session();
let (outcome, keep_review_session, capture_token_usage) = wait_for_guardian_review(
&review_session,
"current-turn",
tokio::time::Instant::now() + Duration::from_secs(1),
/*external_cancel*/ None,
&mut analytics_result,
)
.await;
let GuardianReviewSessionOutcome::SessionFailed { error, error_info } = outcome else {
panic!("expected structured session failure");
};
assert_eq!(error.to_string(), "temporary failure");
assert_eq!(error_info, Some(CodexErrorInfo::ServerOverloaded));
assert!(keep_review_session);
assert!(capture_token_usage);
}
#[tokio::test]
async fn wait_for_guardian_review_ignores_prior_turn_aborts() {
let (review_session, tx_event, _rx_sub) = test_review_session().await;
+337 -19
View File
@@ -55,10 +55,11 @@ use core_test_support::context_snapshot::ContextSnapshotOptions;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_response_once;
use core_test_support::responses::mount_response_sequence;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::sse_failed;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::streaming_sse::StreamingSseChunk;
@@ -172,6 +173,51 @@ async fn guardian_test_session_and_turn(
guardian_test_session_and_turn_with_base_url(server.uri().as_str()).await
}
async fn guardian_test_session_turn_and_rx(
server: &wiremock::MockServer,
) -> (
Arc<Session>,
Arc<TurnContext>,
async_channel::Receiver<Event>,
) {
let (mut session, mut turn, rx) =
crate::session::tests::make_session_and_context_with_rx().await;
Arc::get_mut(&mut session)
.expect("session should be uniquely owned")
.thread_id = fixed_guardian_parent_session_id();
let mut config = (*turn.config).clone();
config.model_provider.base_url = Some(format!("{}/v1", server.uri()));
config.user_instructions = None;
let config = Arc::new(config);
let models_manager = test_support::models_manager_with_provider(
config.codex_home.to_path_buf(),
Arc::clone(&session.services.auth_manager),
config.model_provider.clone(),
);
Arc::get_mut(&mut session)
.expect("session should be uniquely owned")
.services
.models_manager = models_manager;
let turn_mut = Arc::get_mut(&mut turn).expect("turn should be uniquely owned");
turn_mut.config = Arc::clone(&config);
turn_mut.provider =
create_model_provider(config.model_provider.clone(), turn_mut.auth_manager.clone());
turn_mut.user_instructions = None;
(session, turn, rx)
}
fn guardian_shell_request(id: &str) -> GuardianApprovalRequest {
GuardianApprovalRequest::Shell {
id: id.to_string(),
command: vec!["git".to_string(), "push".to_string()],
cwd: test_path_buf("/repo/codex-rs/core").abs(),
sandbox_permissions: crate::sandboxing::SandboxPermissions::UseDefault,
additional_permissions: None,
justification: Some("Need to push the reviewed docs fix.".to_string()),
}
}
async fn guardian_test_session_and_turn_with_base_url(
base_url: &str,
) -> (Arc<Session>, Arc<TurnContext>) {
@@ -1343,6 +1389,7 @@ async fn guardian_request_model_for_auto_review_override(
Some("Sandbox denied outbound git push to github.com.".to_string()),
guardian_output_schema(),
/*external_cancel*/ None,
/*max_attempts*/ 1,
)
.await;
let (GuardianReviewOutcome::Completed(_), _) = outcome else {
@@ -1454,6 +1501,7 @@ async fn guardian_review_request_layout_matches_model_visible_request_snapshot()
Some("Sandbox denied outbound git push to github.com.".to_string()),
guardian_output_schema(),
/*external_cancel*/ None,
/*max_attempts*/ 1,
)
.await;
let (GuardianReviewOutcome::Completed(assessment), metadata) = outcome else {
@@ -1632,6 +1680,7 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow:
Some("First retry reason".to_string()),
guardian_output_schema(),
/*external_cancel*/ None,
/*max_attempts*/ 1,
)
.await;
session
@@ -1676,6 +1725,7 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow:
Some("Second retry reason".to_string()),
guardian_output_schema(),
/*external_cancel*/ None,
/*max_attempts*/ 1,
)
.await;
session
@@ -1716,6 +1766,7 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow:
Some("Third retry reason".to_string()),
guardian_output_schema(),
/*external_cancel*/ None,
/*max_attempts*/ 1,
)
.await;
@@ -1907,6 +1958,7 @@ async fn guardian_reused_trunk_ignores_stale_prior_turn_completion() -> anyhow::
/*retry_reason*/ None,
guardian_output_schema(),
/*external_cancel*/ None,
/*max_attempts*/ 1,
)
.await;
let (GuardianReviewOutcome::Completed(first_assessment), first_metadata) = first_outcome else {
@@ -1949,6 +2001,7 @@ async fn guardian_reused_trunk_ignores_stale_prior_turn_completion() -> anyhow::
/*retry_reason*/ None,
guardian_output_schema(),
/*external_cancel*/ None,
/*max_attempts*/ 1,
)
.await;
let (GuardianReviewOutcome::Completed(second_assessment), second_metadata) = second_outcome
@@ -1978,15 +2031,17 @@ async fn guardian_review_surfaces_responses_api_errors_in_rejection_reason() ->
let server = start_mock_server().await;
let error_message =
"Item 'rs_test' of type 'reasoning' was provided without its required following item.";
let _request_log = mount_response_once(
let request_log = mount_response_sequence(
&server,
wiremock::ResponseTemplate::new(400).set_body_json(serde_json::json!({
"error": {
"message": error_message,
"type": "invalid_request_error",
"param": "input"
}
})),
vec![
wiremock::ResponseTemplate::new(400).set_body_json(serde_json::json!({
"error": {
"message": error_message,
"type": "invalid_request_error",
"param": "input"
}
})),
],
)
.await;
@@ -2030,6 +2085,7 @@ async fn guardian_review_surfaces_responses_api_errors_in_rejection_reason() ->
.await;
assert_eq!(decision, ReviewDecision::Denied);
assert_eq!(request_log.requests().len(), 1);
let mut warnings = Vec::new();
let mut denial_rationales = Vec::new();
@@ -2080,13 +2136,255 @@ async fn guardian_review_surfaces_responses_api_errors_in_rejection_reason() ->
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn guardian_review_retries_transient_session_failure_then_approves() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let approval = serde_json::json!({
"risk_level": "low",
"user_authorization": "high",
"outcome": "allow",
"rationale": "retry succeeded",
})
.to_string();
let request_log = mount_sse_sequence(
&server,
vec![
sse_failed(
"resp-session-failure",
"server_is_overloaded",
"temporary reviewer overload",
),
sse(vec![
ev_response_created("resp-approved"),
ev_assistant_message("msg-approved", &approval),
ev_completed("resp-approved"),
]),
],
)
.await;
let (session, turn) = guardian_test_session_and_turn(&server).await;
seed_guardian_parent_history(&session, &turn).await;
let (outcome, metadata) = run_guardian_review_session_for_test(
Arc::clone(&session),
Arc::clone(&turn),
guardian_shell_request("shell-session-retry"),
/*retry_reason*/ None,
guardian_output_schema(),
/*external_cancel*/ None,
/*max_attempts*/ 3,
)
.await;
let GuardianReviewOutcome::Completed(assessment) = outcome else {
panic!("expected guardian assessment");
};
assert_eq!(assessment.outcome, GuardianAssessmentOutcome::Allow);
assert_eq!(assessment.rationale, "retry succeeded");
assert_eq!(metadata.attempt_count, 2);
assert!(matches!(
metadata.guardian_session_kind,
Some(codex_analytics::GuardianReviewSessionKind::TrunkReused)
));
assert_eq!(request_log.requests().len(), 2);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn guardian_review_does_not_retry_missing_assessment_payload() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let request_log = mount_sse_sequence(
&server,
vec![sse(vec![
ev_response_created("resp-missing-assessment"),
ev_completed("resp-missing-assessment"),
])],
)
.await;
let (session, turn) = guardian_test_session_and_turn(&server).await;
seed_guardian_parent_history(&session, &turn).await;
let decision = review_approval_request(
&session,
&turn,
"review-missing-assessment".to_string(),
guardian_shell_request("shell-missing-assessment"),
/*retry_reason*/ None,
)
.await;
assert_eq!(decision, ReviewDecision::Denied);
assert_eq!(request_log.requests().len(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn guardian_review_retries_two_parse_failures_then_approves() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let approval = serde_json::json!({
"risk_level": "low",
"user_authorization": "high",
"outcome": "allow",
"rationale": "retry succeeded",
})
.to_string();
let request_log = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-parse-failure-1"),
ev_assistant_message("msg-parse-failure-1", "not valid guardian json"),
ev_completed("resp-parse-failure-1"),
]),
sse(vec![
ev_response_created("resp-parse-failure-2"),
ev_assistant_message("msg-parse-failure-2", "still not valid guardian json"),
ev_completed("resp-parse-failure-2"),
]),
sse(vec![
ev_response_created("resp-approved"),
ev_assistant_message("msg-approved", &approval),
ev_completed("resp-approved"),
]),
],
)
.await;
let (session, turn) = guardian_test_session_and_turn(&server).await;
seed_guardian_parent_history(&session, &turn).await;
let (outcome, metadata) = run_guardian_review_session_for_test(
Arc::clone(&session),
Arc::clone(&turn),
guardian_shell_request("shell-parse-retry"),
/*retry_reason*/ None,
guardian_output_schema(),
/*external_cancel*/ None,
/*max_attempts*/ 3,
)
.await;
let GuardianReviewOutcome::Completed(assessment) = outcome else {
panic!("expected guardian assessment");
};
assert_eq!(assessment.outcome, GuardianAssessmentOutcome::Allow);
assert_eq!(assessment.rationale, "retry succeeded");
assert_eq!(metadata.attempt_count, 3);
assert!(matches!(
metadata.guardian_session_kind,
Some(codex_analytics::GuardianReviewSessionKind::TrunkReused)
));
assert_eq!(request_log.requests().len(), 3);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn guardian_review_exhausts_three_failures_with_one_terminal_event() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let request_log = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-parse-failure-1"),
ev_assistant_message("msg-parse-failure-1", "invalid one"),
ev_completed("resp-parse-failure-1"),
]),
sse(vec![
ev_response_created("resp-parse-failure-2"),
ev_assistant_message("msg-parse-failure-2", "invalid two"),
ev_completed("resp-parse-failure-2"),
]),
sse(vec![
ev_response_created("resp-parse-failure-3"),
ev_assistant_message("msg-parse-failure-3", "invalid three"),
ev_completed("resp-parse-failure-3"),
]),
],
)
.await;
let (session, turn, rx) = guardian_test_session_turn_and_rx(&server).await;
seed_guardian_parent_history(&session, &turn).await;
let decision = review_approval_request(
&session,
&turn,
"review-exhausted-retry".to_string(),
guardian_shell_request("shell-exhausted-retry"),
/*retry_reason*/ None,
)
.await;
assert_eq!(decision, ReviewDecision::Denied);
assert_eq!(request_log.requests().len(), 3);
let mut statuses = Vec::new();
while let Ok(event) = rx.try_recv() {
if let EventMsg::GuardianAssessment(event) = event.msg {
statuses.push(event.status);
}
}
assert_eq!(
statuses,
vec![
GuardianAssessmentStatus::InProgress,
GuardianAssessmentStatus::Denied,
]
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn guardian_review_does_not_retry_valid_denial() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let denial = serde_json::json!({
"risk_level": "high",
"user_authorization": "unknown",
"outcome": "deny",
"rationale": "unsafe",
})
.to_string();
let request_log = mount_sse_sequence(
&server,
vec![sse(vec![
ev_response_created("resp-denied"),
ev_assistant_message("msg-denied", &denial),
ev_completed("resp-denied"),
])],
)
.await;
let (session, turn) = guardian_test_session_and_turn(&server).await;
seed_guardian_parent_history(&session, &turn).await;
let decision = review_approval_request(
&session,
&turn,
"review-valid-denial".to_string(),
guardian_shell_request("shell-valid-denial"),
/*retry_reason*/ None,
)
.await;
assert_eq!(decision, ReviewDecision::Denied);
assert_eq!(request_log.requests().len(), 1);
Ok(())
}
#[tokio::test]
async fn guardian_parallel_reviews_fork_from_last_committed_trunk_history() -> anyhow::Result<()> {
async fn guardian_ephemeral_retry_preserves_parallel_trunk_and_fork_history() -> anyhow::Result<()>
{
const TEST_STACK_SIZE_BYTES: usize = 4 * 1024 * 1024;
let handle =
std::thread::Builder::new()
.name("guardian_parallel_reviews_fork_from_last_committed_trunk_history".to_string())
.name("guardian_ephemeral_retry_preserves_parallel_trunk_and_fork_history".to_string())
.stack_size(TEST_STACK_SIZE_BYTES)
.spawn(|| -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
@@ -2141,10 +2439,18 @@ async fn guardian_parallel_reviews_fork_from_last_committed_trunk_history() -> a
gate: None,
body: sse(vec![
ev_response_created("resp-guardian-3"),
ev_assistant_message("msg-guardian-3", &third_assessment),
ev_assistant_message("msg-guardian-3", "not valid guardian json"),
ev_completed("resp-guardian-3"),
]),
}],
vec![StreamingSseChunk {
gate: None,
body: sse(vec![
ev_response_created("resp-guardian-4"),
ev_assistant_message("msg-guardian-4", &third_assessment),
ev_completed("resp-guardian-4"),
]),
}],
])
.await;
@@ -2271,20 +2577,28 @@ async fn guardian_parallel_reviews_fork_from_last_committed_trunk_history() -> a
.await;
assert_eq!(third_decision, ReviewDecision::Approved);
let requests = server.requests().await;
assert_eq!(requests.len(), 3);
assert_eq!(requests.len(), 4);
let second_request_body = serde_json::from_slice::<serde_json::Value>(&requests[1])?;
let third_request_body = serde_json::from_slice::<serde_json::Value>(&requests[2])?;
let failed_ephemeral_request_body =
serde_json::from_slice::<serde_json::Value>(&requests[2])?;
let retried_ephemeral_request_body =
serde_json::from_slice::<serde_json::Value>(&requests[3])?;
assert_eq!(
second_request_body["prompt_cache_key"],
third_request_body["prompt_cache_key"],
failed_ephemeral_request_body["prompt_cache_key"],
"forked guardian review should reuse the trunk guardian prompt cache key"
);
let third_request_body_text = third_request_body.to_string();
assert_eq!(
failed_ephemeral_request_body["prompt_cache_key"],
retried_ephemeral_request_body["prompt_cache_key"],
"retried ephemeral review should preserve the guardian prompt cache key"
);
let third_request_body_text = retried_ephemeral_request_body.to_string();
assert!(
third_request_body_text.contains("first guardian rationale"),
"forked guardian review should include the last committed trunk assessment"
);
let third_user_message = last_user_message_text_from_body(&third_request_body);
let third_user_message = last_user_message_text_from_body(&retried_ephemeral_request_body);
assert!(third_user_message.contains(">>> TRANSCRIPT DELTA START\n"));
assert!(
third_user_message.contains("[5] user: Please inspect pending changes before pushing.")
@@ -2315,7 +2629,7 @@ async fn guardian_parallel_reviews_fork_from_last_committed_trunk_history() -> a
match handle.join() {
Ok(result) => result,
Err(_) => Err(anyhow::anyhow!(
"guardian_parallel_reviews_fork_from_last_committed_trunk_history thread panicked"
"guardian_ephemeral_retry_preserves_parallel_trunk_and_fork_history thread panicked"
)),
}
}
@@ -2541,6 +2855,10 @@ async fn guardian_review_session_config_keeps_bedrock_provider_for_bedrock_gpt_5
)
.expect("guardian config");
let mut expected_model_provider =
ModelProviderInfo::create_amazon_bedrock_provider(/*aws*/ None);
expected_model_provider.request_max_retries = Some(1);
expected_model_provider.stream_max_retries = Some(1);
assert_eq!(
(
guardian_config.model,
@@ -2550,7 +2868,7 @@ async fn guardian_review_session_config_keeps_bedrock_provider_for_bedrock_gpt_5
(
Some(AMAZON_BEDROCK_GPT_5_4_MODEL_ID.to_string()),
AMAZON_BEDROCK_PROVIDER_ID.to_string(),
ModelProviderInfo::create_amazon_bedrock_provider(/*aws*/ None),
expected_model_provider,
)
);
}