From a5229e06867b83b2cf331c852cebfcc74828bd24 Mon Sep 17 00:00:00 2001 From: jif Date: Wed, 17 Jun 2026 10:52:23 +0100 Subject: [PATCH] Back off registry retries during exec recovery (#28546) ## Why PR #28512 retries a failed session recovery every 100 ms. Every Noise recovery attempt first asks the environment registry for a fresh connection bundle, even when the eventual failure comes from the WebSocket or initialize handshake. During an outage, that could make each disconnected client call the registry about 250 times during the 25-second recovery window. ## What changes All retryable Noise recovery failures now use a separate backoff schedule: ```text base: 500 ms -> 1 s -> 2 s -> 4 s -> 5 s maximum actual: 500-750 ms, 1-1.5 s, 2-3 s, 4-6 s, 5-7.5 s ``` The extra 0-50% is deterministic per-session jitter so disconnected clients do not retry together. Direct WebSocket recovery keeps the existing 100 ms retry because it does not re-enter the registry. --- codex-rs/exec-server/src/client_recovery.rs | 70 ++++++++++++++----- .../exec-server/src/client_recovery_tests.rs | 40 +++++++++++ 2 files changed, 94 insertions(+), 16 deletions(-) create mode 100644 codex-rs/exec-server/src/client_recovery_tests.rs diff --git a/codex-rs/exec-server/src/client_recovery.rs b/codex-rs/exec-server/src/client_recovery.rs index 9a2f3c29a..77feb0a4b 100644 --- a/codex-rs/exec-server/src/client_recovery.rs +++ b/codex-rs/exec-server/src/client_recovery.rs @@ -1,3 +1,6 @@ +use std::collections::hash_map::DefaultHasher; +use std::hash::Hash; +use std::hash::Hasher; use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::Duration; @@ -17,6 +20,7 @@ use super::disconnected_message; use super::fail_all_in_flight_work; use super::handle_server_notification; use super::is_transport_closed_error; +use crate::client_transport::ExecServerReconnectStrategy; use crate::process::ExecProcessEvent; use crate::protocol::EXEC_READ_METHOD; use crate::protocol::EXEC_TERMINATE_METHOD; @@ -35,6 +39,8 @@ const SESSION_RECOVERY_TIMEOUT: Duration = Duration::from_millis(500); // client and server start their disconnect clocks independently. const SESSION_RECOVERY_TIMEOUT: Duration = Duration::from_secs(25); const SESSION_RECOVERY_RETRY_INTERVAL: Duration = Duration::from_millis(100); +const REGISTRY_RECOVERY_INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(500); +const REGISTRY_RECOVERY_MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5); impl SessionState { fn last_published_seq(&self) -> u64 { @@ -285,14 +291,18 @@ impl Inner { self.fail(message).await; return; }; + let uses_registry_backoff = matches!( + self.reconnect_strategy.as_ref(), + Some(ExecServerReconnectStrategy::NoiseRendezvous { .. }) + ); + let mut registry_retry_attempt = 0; let last_error = loop { match timeout_at(deadline, self.resume_once(&session_id)).await { - Ok(Ok(candidate)) if !candidate.is_disconnected() => { - if self.install_recovered_client(candidate) { + Ok(Ok(candidate)) => { + if !candidate.is_disconnected() && self.install_recovered_client(candidate) { return; } } - Ok(Ok(_)) => {} Ok(Err(error)) if !is_retryable_recovery_error(&error) => { break error.to_string(); } @@ -302,11 +312,19 @@ impl Inner { } } + let retry_delay = if uses_registry_backoff { + let delay = registry_recovery_retry_delay(&session_id, registry_retry_attempt); + registry_retry_attempt = registry_retry_attempt.saturating_add(1); + delay + } else { + SESSION_RECOVERY_RETRY_INTERVAL + }; + let now = Instant::now(); if now >= deadline { break format!("recovery timed out after {SESSION_RECOVERY_TIMEOUT:?}"); } - sleep(SESSION_RECOVERY_RETRY_INTERVAL.min(deadline - now)).await; + sleep(retry_delay.min(deadline - now)).await; }; let message = @@ -496,21 +514,41 @@ fn is_retryable_recovery_error(error: &ExecServerError) -> bool { | ExecServerError::WebSocketConnect { .. } | ExecServerError::InitializeTimedOut { .. } ) - || matches!( - error, - ExecServerError::EnvironmentRegistryRequest(error) - if error.is_connect() || error.is_timeout() - ) - || matches!( - error, - ExecServerError::EnvironmentRegistryHttp { status, .. } - if status.is_server_error() - || *status == reqwest::StatusCode::REQUEST_TIMEOUT - || *status == reqwest::StatusCode::TOO_MANY_REQUESTS - ) + || is_retryable_registry_error(error) || matches!( error, ExecServerError::Server { code, .. } if *code == SESSION_ALREADY_ATTACHED_ERROR_CODE ) } + +fn is_retryable_registry_error(error: &ExecServerError) -> bool { + matches!( + error, + ExecServerError::EnvironmentRegistryRequest(error) + if error.is_connect() || error.is_timeout() + ) || matches!( + error, + ExecServerError::EnvironmentRegistryHttp { status, .. } + if status.is_server_error() + || *status == reqwest::StatusCode::REQUEST_TIMEOUT + || *status == reqwest::StatusCode::TOO_MANY_REQUESTS + ) +} + +fn registry_recovery_retry_delay(session_id: &str, attempt: u32) -> Duration { + let multiplier = 1_u32.checked_shl(attempt.min(4)).unwrap_or(u32::MAX); + let base_delay = REGISTRY_RECOVERY_INITIAL_RETRY_INTERVAL + .saturating_mul(multiplier) + .min(REGISTRY_RECOVERY_MAX_RETRY_INTERVAL); + let base_millis = base_delay.as_millis() as u64; + let mut hasher = DefaultHasher::new(); + session_id.hash(&mut hasher); + attempt.hash(&mut hasher); + + Duration::from_millis(base_millis + hasher.finish() % (base_millis / 2 + 1)) +} + +#[cfg(test)] +#[path = "client_recovery_tests.rs"] +mod tests; diff --git a/codex-rs/exec-server/src/client_recovery_tests.rs b/codex-rs/exec-server/src/client_recovery_tests.rs new file mode 100644 index 000000000..a6ff91fc9 --- /dev/null +++ b/codex-rs/exec-server/src/client_recovery_tests.rs @@ -0,0 +1,40 @@ +use std::time::Duration; + +use super::*; + +fn registry_error() -> ExecServerError { + ExecServerError::EnvironmentRegistryHttp { + status: reqwest::StatusCode::TOO_MANY_REQUESTS, + code: None, + message: "registry unavailable".to_string(), + } +} + +#[test] +fn registry_recovery_retry_delay_exponentially_backs_off_and_caps() { + let cases = [ + (0, Duration::from_millis(500)), + (1, Duration::from_secs(1)), + (2, Duration::from_secs(2)), + (3, Duration::from_secs(4)), + (4, Duration::from_secs(5)), + (20, Duration::from_secs(5)), + ]; + + for (attempt, base) in cases { + let delay = registry_recovery_retry_delay("session-1", attempt); + assert!(delay >= base, "delay {delay:?} for attempt {attempt}"); + assert!( + delay <= base + base / 2, + "delay {delay:?} for attempt {attempt}" + ); + } +} + +#[test] +fn recovery_retries_transient_registry_errors() { + let error = registry_error(); + + assert!(is_retryable_registry_error(&error)); + assert!(is_retryable_recovery_error(&error)); +}