diff --git a/codex-rs/exec-server/src/client_recovery.rs b/codex-rs/exec-server/src/client_recovery.rs index 9a2f3c29a..77feb0a4b 100644 --- a/codex-rs/exec-server/src/client_recovery.rs +++ b/codex-rs/exec-server/src/client_recovery.rs @@ -1,3 +1,6 @@ +use std::collections::hash_map::DefaultHasher; +use std::hash::Hash; +use std::hash::Hasher; use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::Duration; @@ -17,6 +20,7 @@ use super::disconnected_message; use super::fail_all_in_flight_work; use super::handle_server_notification; use super::is_transport_closed_error; +use crate::client_transport::ExecServerReconnectStrategy; use crate::process::ExecProcessEvent; use crate::protocol::EXEC_READ_METHOD; use crate::protocol::EXEC_TERMINATE_METHOD; @@ -35,6 +39,8 @@ const SESSION_RECOVERY_TIMEOUT: Duration = Duration::from_millis(500); // client and server start their disconnect clocks independently. const SESSION_RECOVERY_TIMEOUT: Duration = Duration::from_secs(25); const SESSION_RECOVERY_RETRY_INTERVAL: Duration = Duration::from_millis(100); +const REGISTRY_RECOVERY_INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(500); +const REGISTRY_RECOVERY_MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5); impl SessionState { fn last_published_seq(&self) -> u64 { @@ -285,14 +291,18 @@ impl Inner { self.fail(message).await; return; }; + let uses_registry_backoff = matches!( + self.reconnect_strategy.as_ref(), + Some(ExecServerReconnectStrategy::NoiseRendezvous { .. }) + ); + let mut registry_retry_attempt = 0; let last_error = loop { match timeout_at(deadline, self.resume_once(&session_id)).await { - Ok(Ok(candidate)) if !candidate.is_disconnected() => { - if self.install_recovered_client(candidate) { + Ok(Ok(candidate)) => { + if !candidate.is_disconnected() && self.install_recovered_client(candidate) { return; } } - Ok(Ok(_)) => {} Ok(Err(error)) if !is_retryable_recovery_error(&error) => { break error.to_string(); } @@ -302,11 +312,19 @@ impl Inner { } } + let retry_delay = if uses_registry_backoff { + let delay = registry_recovery_retry_delay(&session_id, registry_retry_attempt); + registry_retry_attempt = registry_retry_attempt.saturating_add(1); + delay + } else { + SESSION_RECOVERY_RETRY_INTERVAL + }; + let now = Instant::now(); if now >= deadline { break format!("recovery timed out after {SESSION_RECOVERY_TIMEOUT:?}"); } - sleep(SESSION_RECOVERY_RETRY_INTERVAL.min(deadline - now)).await; + sleep(retry_delay.min(deadline - now)).await; }; let message = @@ -496,21 +514,41 @@ fn is_retryable_recovery_error(error: &ExecServerError) -> bool { | ExecServerError::WebSocketConnect { .. } | ExecServerError::InitializeTimedOut { .. } ) - || matches!( - error, - ExecServerError::EnvironmentRegistryRequest(error) - if error.is_connect() || error.is_timeout() - ) - || matches!( - error, - ExecServerError::EnvironmentRegistryHttp { status, .. } - if status.is_server_error() - || *status == reqwest::StatusCode::REQUEST_TIMEOUT - || *status == reqwest::StatusCode::TOO_MANY_REQUESTS - ) + || is_retryable_registry_error(error) || matches!( error, ExecServerError::Server { code, .. } if *code == SESSION_ALREADY_ATTACHED_ERROR_CODE ) } + +fn is_retryable_registry_error(error: &ExecServerError) -> bool { + matches!( + error, + ExecServerError::EnvironmentRegistryRequest(error) + if error.is_connect() || error.is_timeout() + ) || matches!( + error, + ExecServerError::EnvironmentRegistryHttp { status, .. } + if status.is_server_error() + || *status == reqwest::StatusCode::REQUEST_TIMEOUT + || *status == reqwest::StatusCode::TOO_MANY_REQUESTS + ) +} + +fn registry_recovery_retry_delay(session_id: &str, attempt: u32) -> Duration { + let multiplier = 1_u32.checked_shl(attempt.min(4)).unwrap_or(u32::MAX); + let base_delay = REGISTRY_RECOVERY_INITIAL_RETRY_INTERVAL + .saturating_mul(multiplier) + .min(REGISTRY_RECOVERY_MAX_RETRY_INTERVAL); + let base_millis = base_delay.as_millis() as u64; + let mut hasher = DefaultHasher::new(); + session_id.hash(&mut hasher); + attempt.hash(&mut hasher); + + Duration::from_millis(base_millis + hasher.finish() % (base_millis / 2 + 1)) +} + +#[cfg(test)] +#[path = "client_recovery_tests.rs"] +mod tests; diff --git a/codex-rs/exec-server/src/client_recovery_tests.rs b/codex-rs/exec-server/src/client_recovery_tests.rs new file mode 100644 index 000000000..a6ff91fc9 --- /dev/null +++ b/codex-rs/exec-server/src/client_recovery_tests.rs @@ -0,0 +1,40 @@ +use std::time::Duration; + +use super::*; + +fn registry_error() -> ExecServerError { + ExecServerError::EnvironmentRegistryHttp { + status: reqwest::StatusCode::TOO_MANY_REQUESTS, + code: None, + message: "registry unavailable".to_string(), + } +} + +#[test] +fn registry_recovery_retry_delay_exponentially_backs_off_and_caps() { + let cases = [ + (0, Duration::from_millis(500)), + (1, Duration::from_secs(1)), + (2, Duration::from_secs(2)), + (3, Duration::from_secs(4)), + (4, Duration::from_secs(5)), + (20, Duration::from_secs(5)), + ]; + + for (attempt, base) in cases { + let delay = registry_recovery_retry_delay("session-1", attempt); + assert!(delay >= base, "delay {delay:?} for attempt {attempt}"); + assert!( + delay <= base + base / 2, + "delay {delay:?} for attempt {attempt}" + ); + } +} + +#[test] +fn recovery_retries_transient_registry_errors() { + let error = registry_error(); + + assert!(is_retryable_registry_error(&error)); + assert!(is_retryable_recovery_error(&error)); +}