diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 02779c660..b6af11766 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -1937,6 +1937,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() { decision: GuardianReviewDecision::Denied, terminal_status: GuardianReviewTerminalStatus::TimedOut, failure_reason: Some(GuardianReviewFailureReason::Timeout), + attempt_count: 1, risk_level: None, user_authorization: None, outcome: None, @@ -2008,6 +2009,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() { ); assert_eq!(payload[0]["event_params"]["terminal_status"], "timed_out"); assert_eq!(payload[0]["event_params"]["failure_reason"], "timeout"); + assert_eq!(payload[0]["event_params"]["attempt_count"], 1); assert_eq!(payload[0]["event_params"]["review_timeout_ms"], 90_000); } diff --git a/codex-rs/analytics/src/events.rs b/codex-rs/analytics/src/events.rs index 8a25215ec..4c7277b22 100644 --- a/codex-rs/analytics/src/events.rs +++ b/codex-rs/analytics/src/events.rs @@ -266,6 +266,7 @@ pub struct GuardianReviewEventParams { pub decision: GuardianReviewDecision, pub terminal_status: GuardianReviewTerminalStatus, pub failure_reason: Option, + pub attempt_count: i64, pub risk_level: Option, pub user_authorization: Option, pub outcome: Option, @@ -338,6 +339,7 @@ impl GuardianReviewTrackContext { decision: result.decision, terminal_status: result.terminal_status, failure_reason: result.failure_reason, + attempt_count: result.attempt_count, risk_level: result.risk_level, user_authorization: result.user_authorization, outcome: result.outcome, @@ -373,6 +375,7 @@ pub struct GuardianReviewAnalyticsResult { pub decision: GuardianReviewDecision, pub terminal_status: GuardianReviewTerminalStatus, pub failure_reason: Option, + pub attempt_count: i64, pub risk_level: Option, pub user_authorization: Option, pub outcome: Option, @@ -392,6 +395,7 @@ impl GuardianReviewAnalyticsResult { decision: GuardianReviewDecision::Denied, terminal_status: GuardianReviewTerminalStatus::FailedClosed, failure_reason: None, + attempt_count: 1, risk_level: None, user_authorization: None, outcome: None, diff --git a/codex-rs/core/src/guardian/mod.rs b/codex-rs/core/src/guardian/mod.rs index b4920f1ff..b06a2daad 100644 --- a/codex-rs/core/src/guardian/mod.rs +++ b/codex-rs/core/src/guardian/mod.rs @@ -168,7 +168,7 @@ use prompt::render_guardian_transcript_entries; #[cfg(test)] use review::GuardianReviewOutcome; #[cfg(test)] -use review::run_guardian_review_session as run_guardian_review_session_for_test; +use review::run_guardian_review_session_with_retry as run_guardian_review_session_for_test; #[cfg(test)] use review_session::build_guardian_review_session_config as build_guardian_review_session_config_for_test; diff --git a/codex-rs/core/src/guardian/review.rs b/codex-rs/core/src/guardian/review.rs index 934e68042..99f9c5d99 100644 --- a/codex-rs/core/src/guardian/review.rs +++ b/codex-rs/core/src/guardian/review.rs @@ -7,6 +7,7 @@ use codex_analytics::GuardianReviewTrackContext; use codex_analytics::GuardianReviewedAction; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::GuardianAssessmentDecisionSource; use codex_protocol::protocol::GuardianAssessmentEvent; @@ -19,11 +20,14 @@ use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::WarningEvent; use std::sync::Arc; use tokio::sync::oneshot; +use tokio::time::Instant; +use tokio::time::sleep_until; use tokio_util::sync::CancellationToken; use crate::session::session::Session; use crate::session::turn_context::TurnContext; use crate::turn_timing::now_unix_timestamp_ms; +use crate::util::backoff; use super::AUTO_REVIEW_DENIAL_WINDOW_SIZE; use super::GUARDIAN_REVIEW_TIMEOUT; @@ -58,6 +62,8 @@ const GUARDIAN_TIMEOUT_INSTRUCTIONS: &str = concat!( "You may retry once, or ask the user for guidance or explicit approval.", ); +const GUARDIAN_REVIEW_MAX_ATTEMPTS: i64 = 3; + pub(crate) fn new_guardian_review_id() -> String { uuid::Uuid::new_v4().to_string() } @@ -95,9 +101,16 @@ pub(super) enum GuardianReviewOutcome { #[derive(Debug)] pub(super) enum GuardianReviewError { - PromptBuild { message: String }, - Session { message: String }, - Parse { message: String }, + PromptBuild { + message: String, + }, + Session { + message: String, + error_info: Option, + }, + Parse { + message: String, + }, Timeout, Cancelled, } @@ -112,6 +125,14 @@ impl GuardianReviewError { fn session(err: anyhow::Error) -> Self { Self::Session { message: err.to_string(), + error_info: None, + } + } + + fn session_with_error_info(err: anyhow::Error, error_info: CodexErrorInfo) -> Self { + Self::Session { + message: err.to_string(), + error_info: Some(error_info), } } @@ -335,13 +356,14 @@ async fn run_guardian_review( let schema = guardian_output_schema(); let terminal_action = action_summary.clone(); - let (outcome, analytics_result) = Box::pin(run_guardian_review_session( + let (outcome, analytics_result) = Box::pin(run_guardian_review_session_with_retry( session.clone(), turn.clone(), request, retry_reason.clone(), schema, external_cancel, + GUARDIAN_REVIEW_MAX_ATTEMPTS, )) .await; @@ -464,7 +486,7 @@ async fn run_guardian_review( | GuardianReviewError::Parse { .. } => { let message = match &error { GuardianReviewError::PromptBuild { message } - | GuardianReviewError::Session { message } + | GuardianReviewError::Session { message, .. } | GuardianReviewError::Parse { message } => message, GuardianReviewError::Timeout | GuardianReviewError::Cancelled => { "guardian review failed" @@ -657,13 +679,14 @@ pub(crate) fn spawn_approval_request_review( /// context. It may still reuse the parent's managed-network allowlist for /// read-only checks, but it intentionally runs without inherited exec-policy /// rules. -pub(super) async fn run_guardian_review_session( +async fn run_guardian_review_session_before_deadline( session: Arc, turn: Arc, request: GuardianApprovalRequest, retry_reason: Option, schema: serde_json::Value, external_cancel: Option, + deadline: Instant, ) -> (GuardianReviewOutcome, GuardianReviewAnalyticsResult) { let network_proxy = session.services.network_proxy.load_full(); let live_network_config = match network_proxy.as_ref() { @@ -753,6 +776,7 @@ pub(super) async fn run_guardian_review_session( reasoning_summary: turn.reasoning_summary, personality: turn.personality, external_cancel, + deadline, }), ) .await; @@ -787,10 +811,16 @@ pub(super) async fn run_guardian_review_session( GuardianReviewOutcome::Error(GuardianReviewError::prompt_build(err)), session_analytics_result, ), - GuardianReviewSessionOutcome::SessionFailed(err) => ( - GuardianReviewOutcome::Error(GuardianReviewError::session(err)), - session_analytics_result, - ), + GuardianReviewSessionOutcome::SessionFailed { error, error_info } => { + let error = match error_info { + Some(error_info) => GuardianReviewError::session_with_error_info(error, error_info), + None => GuardianReviewError::session(error), + }; + ( + GuardianReviewOutcome::Error(error), + session_analytics_result, + ) + } GuardianReviewSessionOutcome::TimedOut => ( GuardianReviewOutcome::Error(GuardianReviewError::Timeout), session_analytics_result, @@ -802,9 +832,85 @@ pub(super) async fn run_guardian_review_session( } } +pub(super) async fn run_guardian_review_session_with_retry( + session: Arc, + turn: Arc, + request: GuardianApprovalRequest, + retry_reason: Option, + schema: serde_json::Value, + external_cancel: Option, + max_attempts: i64, +) -> (GuardianReviewOutcome, GuardianReviewAnalyticsResult) { + assert!(max_attempts > 0, "guardian review must run at least once"); + let deadline = Instant::now() + GUARDIAN_REVIEW_TIMEOUT; + let mut attempt_count = 1; + loop { + let (outcome, mut analytics_result) = run_guardian_review_session_before_deadline( + Arc::clone(&session), + Arc::clone(&turn), + request.clone(), + retry_reason.clone(), + schema.clone(), + external_cancel.clone(), + deadline, + ) + .await; + analytics_result.attempt_count = attempt_count; + if attempt_count >= max_attempts || !should_retry_guardian_review(&outcome) { + return (outcome, analytics_result); + } + if let Some(error) = + wait_before_guardian_retry(attempt_count, deadline, external_cancel.as_ref()).await + { + return (GuardianReviewOutcome::Error(error), analytics_result); + } + attempt_count += 1; + } +} + +async fn wait_before_guardian_retry( + attempt_count: i64, + deadline: Instant, + external_cancel: Option<&CancellationToken>, +) -> Option { + let retry_delay = backoff(attempt_count as u64); + let retry_at = (Instant::now() + retry_delay).min(deadline); + tokio::select! { + _ = sleep_until(retry_at) => { + (Instant::now() >= deadline).then_some(GuardianReviewError::Timeout) + } + _ = async { + if let Some(cancel_token) = external_cancel { + cancel_token.cancelled().await; + } else { + std::future::pending::<()>().await; + } + } => Some(GuardianReviewError::Cancelled), + } +} + +fn should_retry_guardian_review(outcome: &GuardianReviewOutcome) -> bool { + matches!( + outcome, + GuardianReviewOutcome::Error( + GuardianReviewError::Session { + error_info: Some( + CodexErrorInfo::ServerOverloaded + | CodexErrorInfo::HttpConnectionFailed { .. } + | CodexErrorInfo::ResponseStreamConnectionFailed { .. } + | CodexErrorInfo::InternalServerError + | CodexErrorInfo::ResponseStreamDisconnected { .. } + ), + .. + } | GuardianReviewError::Parse { .. } + ) + ) +} + #[cfg(test)] mod review_tests { use super::*; + use std::time::Duration; #[test] fn guardian_review_error_reason_distinguishes_error_kinds() { @@ -812,6 +918,10 @@ mod review_tests { let prompt_error = GuardianReviewError::prompt_build(anyhow::anyhow!("bad prompt/config")); let session_error = GuardianReviewError::session(anyhow::anyhow!("guardian runtime failed")); + let structured_session_error = GuardianReviewError::session_with_error_info( + anyhow::anyhow!("temporary guardian failure"), + CodexErrorInfo::ServerOverloaded, + ); assert!(matches!( parse_error.failure_reason(), @@ -825,5 +935,109 @@ mod review_tests { session_error.failure_reason(), GuardianReviewFailureReason::SessionError )); + assert!(matches!( + structured_session_error.failure_reason(), + GuardianReviewFailureReason::SessionError + )); + } + + #[test] + fn guardian_review_retry_only_retries_transient_session_and_parse_errors() { + let assessment = GuardianAssessment { + risk_level: GuardianRiskLevel::High, + user_authorization: GuardianUserAuthorization::Unknown, + outcome: GuardianAssessmentOutcome::Deny, + rationale: "deny".to_string(), + }; + let transient_error_info = [ + CodexErrorInfo::ServerOverloaded, + CodexErrorInfo::HttpConnectionFailed { + http_status_code: Some(502), + }, + CodexErrorInfo::ResponseStreamConnectionFailed { + http_status_code: Some(503), + }, + CodexErrorInfo::InternalServerError, + CodexErrorInfo::ResponseStreamDisconnected { + http_status_code: None, + }, + ]; + let mut outcomes = transient_error_info + .into_iter() + .map(|error_info| { + ( + GuardianReviewOutcome::Error(GuardianReviewError::session_with_error_info( + anyhow::anyhow!("transient session"), + error_info, + )), + true, + ) + }) + .collect::>(); + outcomes.extend([ + (GuardianReviewOutcome::Completed(assessment), false), + ( + GuardianReviewOutcome::Error(GuardianReviewError::prompt_build(anyhow::anyhow!( + "prompt" + ))), + false, + ), + ( + GuardianReviewOutcome::Error(GuardianReviewError::session(anyhow::anyhow!( + "session" + ))), + false, + ), + ( + GuardianReviewOutcome::Error(GuardianReviewError::session_with_error_info( + anyhow::anyhow!("bad request"), + CodexErrorInfo::BadRequest, + )), + false, + ), + ( + GuardianReviewOutcome::Error(GuardianReviewError::parse(anyhow::anyhow!("parse"))), + true, + ), + ( + GuardianReviewOutcome::Error(GuardianReviewError::Timeout), + false, + ), + ( + GuardianReviewOutcome::Error(GuardianReviewError::Cancelled), + false, + ), + ]); + + for (outcome, expected) in outcomes { + assert_eq!(should_retry_guardian_review(&outcome), expected); + } + } + + #[tokio::test] + async fn guardian_review_retry_wait_honors_cancellation() { + let cancel_token = CancellationToken::new(); + cancel_token.cancel(); + + let error = wait_before_guardian_retry( + /*attempt_count*/ 1, + Instant::now() + Duration::from_secs(/*secs*/ 1), + Some(&cancel_token), + ) + .await; + + assert!(matches!(error, Some(GuardianReviewError::Cancelled))); + } + + #[tokio::test] + async fn guardian_review_retry_wait_honors_deadline() { + let error = wait_before_guardian_retry( + /*attempt_count*/ 1, + Instant::now(), + /*external_cancel*/ None, + ) + .await; + + assert!(matches!(error, Some(GuardianReviewError::Timeout))); } } diff --git a/codex-rs/core/src/guardian/review_session.rs b/codex-rs/core/src/guardian/review_session.rs index fcda9a5a2..54282c1c0 100644 --- a/codex-rs/core/src/guardian/review_session.rs +++ b/codex-rs/core/src/guardian/review_session.rs @@ -15,6 +15,8 @@ use codex_protocol::models::PermissionProfile; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::CodexErrorInfo; +use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::InitialHistory; @@ -46,7 +48,6 @@ use codex_features::Feature; use codex_model_provider_info::ModelProviderInfo; use codex_utils_absolute_path::AbsolutePathBuf; -use super::GUARDIAN_REVIEW_TIMEOUT; use super::GUARDIAN_REVIEWER_NAME; use super::GuardianApprovalRequest; use super::prompt::GuardianPromptMode; @@ -60,7 +61,10 @@ const GUARDIAN_INTERRUPT_DRAIN_TIMEOUT: Duration = Duration::from_secs(5); pub(crate) enum GuardianReviewSessionOutcome { Completed(anyhow::Result>), PromptBuildFailed(anyhow::Error), - SessionFailed(anyhow::Error), + SessionFailed { + error: anyhow::Error, + error_info: Option, + }, TimedOut, Aborted, } @@ -77,6 +81,7 @@ pub(crate) struct GuardianReviewSessionParams { pub(crate) reasoning_summary: ReasoningSummaryConfig, pub(crate) personality: Option, pub(crate) external_cancel: Option, + pub(crate) deadline: tokio::time::Instant, } #[derive(Default)] @@ -312,7 +317,7 @@ impl GuardianReviewSessionManager { &self, params: GuardianReviewSessionParams, ) -> (GuardianReviewSessionOutcome, GuardianReviewAnalyticsResult) { - let deadline = tokio::time::Instant::now() + GUARDIAN_REVIEW_TIMEOUT; + let deadline = params.deadline; let next_reuse_key = GuardianReviewSessionReuseKey::from_spawn_config(¶ms.spawn_config); let mut stale_trunk_to_shutdown = None; let mut spawned_trunk = false; @@ -364,7 +369,9 @@ impl GuardianReviewSessionManager { state.trunk.as_ref().cloned() } - Err(outcome) => return (outcome, GuardianReviewAnalyticsResult::without_session()), + Err(outcome) => { + return (outcome, GuardianReviewAnalyticsResult::without_session()); + } }; if let Some(review_session) = stale_trunk_to_shutdown { @@ -560,7 +567,9 @@ impl GuardianReviewSessionManager { GuardianReviewAnalyticsResult::without_session(), ); } - Err(outcome) => return (outcome, GuardianReviewAnalyticsResult::without_session()), + Err(outcome) => { + return (outcome, GuardianReviewAnalyticsResult::without_session()); + } }; self.register_active_ephemeral(Arc::clone(&review_session)) .await; @@ -764,7 +773,10 @@ async fn run_review_on_session( Ok(Ok(child_turn_id)) => child_turn_id, Ok(Err(err)) => { return ( - GuardianReviewSessionOutcome::SessionFailed(err.into()), + GuardianReviewSessionOutcome::SessionFailed { + error: err.into(), + error_info: None, + }, false, analytics_result, ); @@ -825,7 +837,7 @@ async fn wait_for_guardian_review( ) -> (GuardianReviewSessionOutcome, bool, bool) { let timeout = tokio::time::sleep_until(deadline); tokio::pin!(timeout); - let mut last_error_message: Option = None; + let mut last_error: Option = None; loop { tokio::select! { @@ -862,10 +874,13 @@ async fn wait_for_guardian_review( .time_to_first_token_ms .and_then(|ms| u64::try_from(ms).ok()); if turn_complete.last_agent_message.is_none() - && let Some(error_message) = last_error_message + && let Some(error) = last_error { return ( - GuardianReviewSessionOutcome::Completed(Err(anyhow!(error_message))), + GuardianReviewSessionOutcome::SessionFailed { + error: anyhow!(error.message), + error_info: error.codex_error_info, + }, true, true, ); @@ -877,7 +892,7 @@ async fn wait_for_guardian_review( ); } EventMsg::Error(error) => { - last_error_message = Some(error.message); + last_error = Some(error); } EventMsg::TurnAborted(_) => { return (GuardianReviewSessionOutcome::Aborted, true, false); @@ -920,6 +935,8 @@ pub(crate) fn build_guardian_review_session_config( let mut guardian_config = parent_config.clone(); guardian_config.model = Some(active_model.to_string()); guardian_config.model_reasoning_effort = reasoning_effort; + guardian_config.model_provider.request_max_retries = Some(1); + guardian_config.model_provider.stream_max_retries = Some(1); guardian_config.include_skill_instructions = false; guardian_config.base_instructions = Some( parent_config @@ -1148,6 +1165,7 @@ mod tests { reasoning_summary, personality, external_cancel: None, + deadline: tokio::time::Instant::now() + Duration::from_secs(30), } } @@ -1473,6 +1491,29 @@ mod tests { assert!(keep_review_session); } + #[tokio::test] + async fn run_review_removes_trunk_when_event_stream_is_broken() { + let (mut review_session, tx_event, _rx_sub) = test_review_session().await; + let params = test_review_params().await; + review_session.reuse_key = + GuardianReviewSessionReuseKey::from_spawn_config(¶ms.spawn_config); + let manager = GuardianReviewSessionManager { + state: Arc::new(Mutex::new(GuardianReviewSessionState { + trunk: Some(Arc::new(review_session)), + ephemeral_reviews: Vec::new(), + })), + }; + drop(tx_event); + + let (outcome, _) = manager.run_review(params).await; + + assert!(matches!( + outcome, + GuardianReviewSessionOutcome::Completed(Err(_)) + )); + assert!(manager.state.lock().await.trunk.is_none()); + } + #[tokio::test] async fn wait_for_guardian_review_ignores_prior_turn_completion() { let (review_session, tx_event, _rx_sub) = test_review_session().await; @@ -1545,6 +1586,47 @@ mod tests { assert!(capture_token_usage); } + #[tokio::test] + async fn wait_for_guardian_review_preserves_structured_session_error() { + let (review_session, tx_event, _rx_sub) = test_review_session().await; + tx_event + .send(Event { + id: "current-turn".to_string(), + msg: EventMsg::Error(ErrorEvent { + message: "temporary failure".to_string(), + codex_error_info: Some(CodexErrorInfo::ServerOverloaded), + }), + }) + .await + .expect("queue guardian error"); + tx_event + .send(turn_complete_event( + "current-turn", + /*last_agent_message*/ None, + Some(42), + )) + .await + .expect("queue current turn completion"); + + let mut analytics_result = GuardianReviewAnalyticsResult::without_session(); + let (outcome, keep_review_session, capture_token_usage) = wait_for_guardian_review( + &review_session, + "current-turn", + tokio::time::Instant::now() + Duration::from_secs(1), + /*external_cancel*/ None, + &mut analytics_result, + ) + .await; + + let GuardianReviewSessionOutcome::SessionFailed { error, error_info } = outcome else { + panic!("expected structured session failure"); + }; + assert_eq!(error.to_string(), "temporary failure"); + assert_eq!(error_info, Some(CodexErrorInfo::ServerOverloaded)); + assert!(keep_review_session); + assert!(capture_token_usage); + } + #[tokio::test] async fn wait_for_guardian_review_ignores_prior_turn_aborts() { let (review_session, tx_event, _rx_sub) = test_review_session().await; diff --git a/codex-rs/core/src/guardian/tests.rs b/codex-rs/core/src/guardian/tests.rs index ffadd801c..8f6e01298 100644 --- a/codex-rs/core/src/guardian/tests.rs +++ b/codex-rs/core/src/guardian/tests.rs @@ -55,10 +55,11 @@ use core_test_support::context_snapshot::ContextSnapshotOptions; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_response_created; -use core_test_support::responses::mount_response_once; +use core_test_support::responses::mount_response_sequence; use core_test_support::responses::mount_sse_once; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; +use core_test_support::responses::sse_failed; use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::streaming_sse::StreamingSseChunk; @@ -172,6 +173,51 @@ async fn guardian_test_session_and_turn( guardian_test_session_and_turn_with_base_url(server.uri().as_str()).await } +async fn guardian_test_session_turn_and_rx( + server: &wiremock::MockServer, +) -> ( + Arc, + Arc, + async_channel::Receiver, +) { + let (mut session, mut turn, rx) = + crate::session::tests::make_session_and_context_with_rx().await; + Arc::get_mut(&mut session) + .expect("session should be uniquely owned") + .thread_id = fixed_guardian_parent_session_id(); + let mut config = (*turn.config).clone(); + config.model_provider.base_url = Some(format!("{}/v1", server.uri())); + config.user_instructions = None; + let config = Arc::new(config); + let models_manager = test_support::models_manager_with_provider( + config.codex_home.to_path_buf(), + Arc::clone(&session.services.auth_manager), + config.model_provider.clone(), + ); + Arc::get_mut(&mut session) + .expect("session should be uniquely owned") + .services + .models_manager = models_manager; + let turn_mut = Arc::get_mut(&mut turn).expect("turn should be uniquely owned"); + turn_mut.config = Arc::clone(&config); + turn_mut.provider = + create_model_provider(config.model_provider.clone(), turn_mut.auth_manager.clone()); + turn_mut.user_instructions = None; + + (session, turn, rx) +} + +fn guardian_shell_request(id: &str) -> GuardianApprovalRequest { + GuardianApprovalRequest::Shell { + id: id.to_string(), + command: vec!["git".to_string(), "push".to_string()], + cwd: test_path_buf("/repo/codex-rs/core").abs(), + sandbox_permissions: crate::sandboxing::SandboxPermissions::UseDefault, + additional_permissions: None, + justification: Some("Need to push the reviewed docs fix.".to_string()), + } +} + async fn guardian_test_session_and_turn_with_base_url( base_url: &str, ) -> (Arc, Arc) { @@ -1343,6 +1389,7 @@ async fn guardian_request_model_for_auto_review_override( Some("Sandbox denied outbound git push to github.com.".to_string()), guardian_output_schema(), /*external_cancel*/ None, + /*max_attempts*/ 1, ) .await; let (GuardianReviewOutcome::Completed(_), _) = outcome else { @@ -1454,6 +1501,7 @@ async fn guardian_review_request_layout_matches_model_visible_request_snapshot() Some("Sandbox denied outbound git push to github.com.".to_string()), guardian_output_schema(), /*external_cancel*/ None, + /*max_attempts*/ 1, ) .await; let (GuardianReviewOutcome::Completed(assessment), metadata) = outcome else { @@ -1632,6 +1680,7 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow: Some("First retry reason".to_string()), guardian_output_schema(), /*external_cancel*/ None, + /*max_attempts*/ 1, ) .await; session @@ -1676,6 +1725,7 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow: Some("Second retry reason".to_string()), guardian_output_schema(), /*external_cancel*/ None, + /*max_attempts*/ 1, ) .await; session @@ -1716,6 +1766,7 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow: Some("Third retry reason".to_string()), guardian_output_schema(), /*external_cancel*/ None, + /*max_attempts*/ 1, ) .await; @@ -1907,6 +1958,7 @@ async fn guardian_reused_trunk_ignores_stale_prior_turn_completion() -> anyhow:: /*retry_reason*/ None, guardian_output_schema(), /*external_cancel*/ None, + /*max_attempts*/ 1, ) .await; let (GuardianReviewOutcome::Completed(first_assessment), first_metadata) = first_outcome else { @@ -1949,6 +2001,7 @@ async fn guardian_reused_trunk_ignores_stale_prior_turn_completion() -> anyhow:: /*retry_reason*/ None, guardian_output_schema(), /*external_cancel*/ None, + /*max_attempts*/ 1, ) .await; let (GuardianReviewOutcome::Completed(second_assessment), second_metadata) = second_outcome @@ -1978,15 +2031,17 @@ async fn guardian_review_surfaces_responses_api_errors_in_rejection_reason() -> let server = start_mock_server().await; let error_message = "Item 'rs_test' of type 'reasoning' was provided without its required following item."; - let _request_log = mount_response_once( + let request_log = mount_response_sequence( &server, - wiremock::ResponseTemplate::new(400).set_body_json(serde_json::json!({ - "error": { - "message": error_message, - "type": "invalid_request_error", - "param": "input" - } - })), + vec![ + wiremock::ResponseTemplate::new(400).set_body_json(serde_json::json!({ + "error": { + "message": error_message, + "type": "invalid_request_error", + "param": "input" + } + })), + ], ) .await; @@ -2030,6 +2085,7 @@ async fn guardian_review_surfaces_responses_api_errors_in_rejection_reason() -> .await; assert_eq!(decision, ReviewDecision::Denied); + assert_eq!(request_log.requests().len(), 1); let mut warnings = Vec::new(); let mut denial_rationales = Vec::new(); @@ -2080,13 +2136,255 @@ async fn guardian_review_surfaces_responses_api_errors_in_rejection_reason() -> Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn guardian_review_retries_transient_session_failure_then_approves() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let approval = serde_json::json!({ + "risk_level": "low", + "user_authorization": "high", + "outcome": "allow", + "rationale": "retry succeeded", + }) + .to_string(); + let request_log = mount_sse_sequence( + &server, + vec![ + sse_failed( + "resp-session-failure", + "server_is_overloaded", + "temporary reviewer overload", + ), + sse(vec![ + ev_response_created("resp-approved"), + ev_assistant_message("msg-approved", &approval), + ev_completed("resp-approved"), + ]), + ], + ) + .await; + let (session, turn) = guardian_test_session_and_turn(&server).await; + seed_guardian_parent_history(&session, &turn).await; + + let (outcome, metadata) = run_guardian_review_session_for_test( + Arc::clone(&session), + Arc::clone(&turn), + guardian_shell_request("shell-session-retry"), + /*retry_reason*/ None, + guardian_output_schema(), + /*external_cancel*/ None, + /*max_attempts*/ 3, + ) + .await; + + let GuardianReviewOutcome::Completed(assessment) = outcome else { + panic!("expected guardian assessment"); + }; + assert_eq!(assessment.outcome, GuardianAssessmentOutcome::Allow); + assert_eq!(assessment.rationale, "retry succeeded"); + assert_eq!(metadata.attempt_count, 2); + assert!(matches!( + metadata.guardian_session_kind, + Some(codex_analytics::GuardianReviewSessionKind::TrunkReused) + )); + assert_eq!(request_log.requests().len(), 2); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn guardian_review_does_not_retry_missing_assessment_payload() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let request_log = mount_sse_sequence( + &server, + vec![sse(vec![ + ev_response_created("resp-missing-assessment"), + ev_completed("resp-missing-assessment"), + ])], + ) + .await; + let (session, turn) = guardian_test_session_and_turn(&server).await; + seed_guardian_parent_history(&session, &turn).await; + + let decision = review_approval_request( + &session, + &turn, + "review-missing-assessment".to_string(), + guardian_shell_request("shell-missing-assessment"), + /*retry_reason*/ None, + ) + .await; + + assert_eq!(decision, ReviewDecision::Denied); + assert_eq!(request_log.requests().len(), 1); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn guardian_review_retries_two_parse_failures_then_approves() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let approval = serde_json::json!({ + "risk_level": "low", + "user_authorization": "high", + "outcome": "allow", + "rationale": "retry succeeded", + }) + .to_string(); + let request_log = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-parse-failure-1"), + ev_assistant_message("msg-parse-failure-1", "not valid guardian json"), + ev_completed("resp-parse-failure-1"), + ]), + sse(vec![ + ev_response_created("resp-parse-failure-2"), + ev_assistant_message("msg-parse-failure-2", "still not valid guardian json"), + ev_completed("resp-parse-failure-2"), + ]), + sse(vec![ + ev_response_created("resp-approved"), + ev_assistant_message("msg-approved", &approval), + ev_completed("resp-approved"), + ]), + ], + ) + .await; + let (session, turn) = guardian_test_session_and_turn(&server).await; + seed_guardian_parent_history(&session, &turn).await; + + let (outcome, metadata) = run_guardian_review_session_for_test( + Arc::clone(&session), + Arc::clone(&turn), + guardian_shell_request("shell-parse-retry"), + /*retry_reason*/ None, + guardian_output_schema(), + /*external_cancel*/ None, + /*max_attempts*/ 3, + ) + .await; + + let GuardianReviewOutcome::Completed(assessment) = outcome else { + panic!("expected guardian assessment"); + }; + assert_eq!(assessment.outcome, GuardianAssessmentOutcome::Allow); + assert_eq!(assessment.rationale, "retry succeeded"); + assert_eq!(metadata.attempt_count, 3); + assert!(matches!( + metadata.guardian_session_kind, + Some(codex_analytics::GuardianReviewSessionKind::TrunkReused) + )); + assert_eq!(request_log.requests().len(), 3); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn guardian_review_exhausts_three_failures_with_one_terminal_event() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let request_log = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-parse-failure-1"), + ev_assistant_message("msg-parse-failure-1", "invalid one"), + ev_completed("resp-parse-failure-1"), + ]), + sse(vec![ + ev_response_created("resp-parse-failure-2"), + ev_assistant_message("msg-parse-failure-2", "invalid two"), + ev_completed("resp-parse-failure-2"), + ]), + sse(vec![ + ev_response_created("resp-parse-failure-3"), + ev_assistant_message("msg-parse-failure-3", "invalid three"), + ev_completed("resp-parse-failure-3"), + ]), + ], + ) + .await; + let (session, turn, rx) = guardian_test_session_turn_and_rx(&server).await; + seed_guardian_parent_history(&session, &turn).await; + + let decision = review_approval_request( + &session, + &turn, + "review-exhausted-retry".to_string(), + guardian_shell_request("shell-exhausted-retry"), + /*retry_reason*/ None, + ) + .await; + + assert_eq!(decision, ReviewDecision::Denied); + assert_eq!(request_log.requests().len(), 3); + let mut statuses = Vec::new(); + while let Ok(event) = rx.try_recv() { + if let EventMsg::GuardianAssessment(event) = event.msg { + statuses.push(event.status); + } + } + assert_eq!( + statuses, + vec![ + GuardianAssessmentStatus::InProgress, + GuardianAssessmentStatus::Denied, + ] + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn guardian_review_does_not_retry_valid_denial() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let denial = serde_json::json!({ + "risk_level": "high", + "user_authorization": "unknown", + "outcome": "deny", + "rationale": "unsafe", + }) + .to_string(); + let request_log = mount_sse_sequence( + &server, + vec![sse(vec![ + ev_response_created("resp-denied"), + ev_assistant_message("msg-denied", &denial), + ev_completed("resp-denied"), + ])], + ) + .await; + let (session, turn) = guardian_test_session_and_turn(&server).await; + seed_guardian_parent_history(&session, &turn).await; + + let decision = review_approval_request( + &session, + &turn, + "review-valid-denial".to_string(), + guardian_shell_request("shell-valid-denial"), + /*retry_reason*/ None, + ) + .await; + + assert_eq!(decision, ReviewDecision::Denied); + assert_eq!(request_log.requests().len(), 1); + Ok(()) +} + #[tokio::test] -async fn guardian_parallel_reviews_fork_from_last_committed_trunk_history() -> anyhow::Result<()> { +async fn guardian_ephemeral_retry_preserves_parallel_trunk_and_fork_history() -> anyhow::Result<()> +{ const TEST_STACK_SIZE_BYTES: usize = 4 * 1024 * 1024; let handle = std::thread::Builder::new() - .name("guardian_parallel_reviews_fork_from_last_committed_trunk_history".to_string()) + .name("guardian_ephemeral_retry_preserves_parallel_trunk_and_fork_history".to_string()) .stack_size(TEST_STACK_SIZE_BYTES) .spawn(|| -> anyhow::Result<()> { let runtime = tokio::runtime::Builder::new_current_thread() @@ -2141,10 +2439,18 @@ async fn guardian_parallel_reviews_fork_from_last_committed_trunk_history() -> a gate: None, body: sse(vec![ ev_response_created("resp-guardian-3"), - ev_assistant_message("msg-guardian-3", &third_assessment), + ev_assistant_message("msg-guardian-3", "not valid guardian json"), ev_completed("resp-guardian-3"), ]), }], + vec![StreamingSseChunk { + gate: None, + body: sse(vec![ + ev_response_created("resp-guardian-4"), + ev_assistant_message("msg-guardian-4", &third_assessment), + ev_completed("resp-guardian-4"), + ]), + }], ]) .await; @@ -2271,20 +2577,28 @@ async fn guardian_parallel_reviews_fork_from_last_committed_trunk_history() -> a .await; assert_eq!(third_decision, ReviewDecision::Approved); let requests = server.requests().await; - assert_eq!(requests.len(), 3); + assert_eq!(requests.len(), 4); let second_request_body = serde_json::from_slice::(&requests[1])?; - let third_request_body = serde_json::from_slice::(&requests[2])?; + let failed_ephemeral_request_body = + serde_json::from_slice::(&requests[2])?; + let retried_ephemeral_request_body = + serde_json::from_slice::(&requests[3])?; assert_eq!( second_request_body["prompt_cache_key"], - third_request_body["prompt_cache_key"], + failed_ephemeral_request_body["prompt_cache_key"], "forked guardian review should reuse the trunk guardian prompt cache key" ); - let third_request_body_text = third_request_body.to_string(); + assert_eq!( + failed_ephemeral_request_body["prompt_cache_key"], + retried_ephemeral_request_body["prompt_cache_key"], + "retried ephemeral review should preserve the guardian prompt cache key" + ); + let third_request_body_text = retried_ephemeral_request_body.to_string(); assert!( third_request_body_text.contains("first guardian rationale"), "forked guardian review should include the last committed trunk assessment" ); - let third_user_message = last_user_message_text_from_body(&third_request_body); + let third_user_message = last_user_message_text_from_body(&retried_ephemeral_request_body); assert!(third_user_message.contains(">>> TRANSCRIPT DELTA START\n")); assert!( third_user_message.contains("[5] user: Please inspect pending changes before pushing.") @@ -2315,7 +2629,7 @@ async fn guardian_parallel_reviews_fork_from_last_committed_trunk_history() -> a match handle.join() { Ok(result) => result, Err(_) => Err(anyhow::anyhow!( - "guardian_parallel_reviews_fork_from_last_committed_trunk_history thread panicked" + "guardian_ephemeral_retry_preserves_parallel_trunk_and_fork_history thread panicked" )), } } @@ -2541,6 +2855,10 @@ async fn guardian_review_session_config_keeps_bedrock_provider_for_bedrock_gpt_5 ) .expect("guardian config"); + let mut expected_model_provider = + ModelProviderInfo::create_amazon_bedrock_provider(/*aws*/ None); + expected_model_provider.request_max_retries = Some(1); + expected_model_provider.stream_max_retries = Some(1); assert_eq!( ( guardian_config.model, @@ -2550,7 +2868,7 @@ async fn guardian_review_session_config_keeps_bedrock_provider_for_bedrock_gpt_5 ( Some(AMAZON_BEDROCK_GPT_5_4_MODEL_ID.to_string()), AMAZON_BEDROCK_PROVIDER_ID.to_string(), - ModelProviderInfo::create_amazon_bedrock_provider(/*aws*/ None), + expected_model_provider, ) ); }