mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
Back off registry retries during exec recovery (#28546)
## Why PR #28512 retries a failed session recovery every 100 ms. Every Noise recovery attempt first asks the environment registry for a fresh connection bundle, even when the eventual failure comes from the WebSocket or initialize handshake. During an outage, that could make each disconnected client call the registry about 250 times during the 25-second recovery window. ## What changes All retryable Noise recovery failures now use a separate backoff schedule: ```text base: 500 ms -> 1 s -> 2 s -> 4 s -> 5 s maximum actual: 500-750 ms, 1-1.5 s, 2-3 s, 4-6 s, 5-7.5 s ``` The extra 0-50% is deterministic per-session jitter so disconnected clients do not retry together. Direct WebSocket recovery keeps the existing 100 ms retry because it does not re-enter the registry.
This commit is contained in:
@@ -1,3 +1,6 @@
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::Hash;
|
||||
use std::hash::Hasher;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
@@ -17,6 +20,7 @@ use super::disconnected_message;
|
||||
use super::fail_all_in_flight_work;
|
||||
use super::handle_server_notification;
|
||||
use super::is_transport_closed_error;
|
||||
use crate::client_transport::ExecServerReconnectStrategy;
|
||||
use crate::process::ExecProcessEvent;
|
||||
use crate::protocol::EXEC_READ_METHOD;
|
||||
use crate::protocol::EXEC_TERMINATE_METHOD;
|
||||
@@ -35,6 +39,8 @@ const SESSION_RECOVERY_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
// client and server start their disconnect clocks independently.
|
||||
const SESSION_RECOVERY_TIMEOUT: Duration = Duration::from_secs(25);
|
||||
const SESSION_RECOVERY_RETRY_INTERVAL: Duration = Duration::from_millis(100);
|
||||
const REGISTRY_RECOVERY_INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(500);
|
||||
const REGISTRY_RECOVERY_MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
impl SessionState {
|
||||
fn last_published_seq(&self) -> u64 {
|
||||
@@ -285,14 +291,18 @@ impl Inner {
|
||||
self.fail(message).await;
|
||||
return;
|
||||
};
|
||||
let uses_registry_backoff = matches!(
|
||||
self.reconnect_strategy.as_ref(),
|
||||
Some(ExecServerReconnectStrategy::NoiseRendezvous { .. })
|
||||
);
|
||||
let mut registry_retry_attempt = 0;
|
||||
let last_error = loop {
|
||||
match timeout_at(deadline, self.resume_once(&session_id)).await {
|
||||
Ok(Ok(candidate)) if !candidate.is_disconnected() => {
|
||||
if self.install_recovered_client(candidate) {
|
||||
Ok(Ok(candidate)) => {
|
||||
if !candidate.is_disconnected() && self.install_recovered_client(candidate) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Ok(Ok(_)) => {}
|
||||
Ok(Err(error)) if !is_retryable_recovery_error(&error) => {
|
||||
break error.to_string();
|
||||
}
|
||||
@@ -302,11 +312,19 @@ impl Inner {
|
||||
}
|
||||
}
|
||||
|
||||
let retry_delay = if uses_registry_backoff {
|
||||
let delay = registry_recovery_retry_delay(&session_id, registry_retry_attempt);
|
||||
registry_retry_attempt = registry_retry_attempt.saturating_add(1);
|
||||
delay
|
||||
} else {
|
||||
SESSION_RECOVERY_RETRY_INTERVAL
|
||||
};
|
||||
|
||||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
break format!("recovery timed out after {SESSION_RECOVERY_TIMEOUT:?}");
|
||||
}
|
||||
sleep(SESSION_RECOVERY_RETRY_INTERVAL.min(deadline - now)).await;
|
||||
sleep(retry_delay.min(deadline - now)).await;
|
||||
};
|
||||
|
||||
let message =
|
||||
@@ -496,21 +514,41 @@ fn is_retryable_recovery_error(error: &ExecServerError) -> bool {
|
||||
| ExecServerError::WebSocketConnect { .. }
|
||||
| ExecServerError::InitializeTimedOut { .. }
|
||||
)
|
||||
|| matches!(
|
||||
error,
|
||||
ExecServerError::EnvironmentRegistryRequest(error)
|
||||
if error.is_connect() || error.is_timeout()
|
||||
)
|
||||
|| matches!(
|
||||
error,
|
||||
ExecServerError::EnvironmentRegistryHttp { status, .. }
|
||||
if status.is_server_error()
|
||||
|| *status == reqwest::StatusCode::REQUEST_TIMEOUT
|
||||
|| *status == reqwest::StatusCode::TOO_MANY_REQUESTS
|
||||
)
|
||||
|| is_retryable_registry_error(error)
|
||||
|| matches!(
|
||||
error,
|
||||
ExecServerError::Server { code, .. }
|
||||
if *code == SESSION_ALREADY_ATTACHED_ERROR_CODE
|
||||
)
|
||||
}
|
||||
|
||||
fn is_retryable_registry_error(error: &ExecServerError) -> bool {
|
||||
matches!(
|
||||
error,
|
||||
ExecServerError::EnvironmentRegistryRequest(error)
|
||||
if error.is_connect() || error.is_timeout()
|
||||
) || matches!(
|
||||
error,
|
||||
ExecServerError::EnvironmentRegistryHttp { status, .. }
|
||||
if status.is_server_error()
|
||||
|| *status == reqwest::StatusCode::REQUEST_TIMEOUT
|
||||
|| *status == reqwest::StatusCode::TOO_MANY_REQUESTS
|
||||
)
|
||||
}
|
||||
|
||||
fn registry_recovery_retry_delay(session_id: &str, attempt: u32) -> Duration {
|
||||
let multiplier = 1_u32.checked_shl(attempt.min(4)).unwrap_or(u32::MAX);
|
||||
let base_delay = REGISTRY_RECOVERY_INITIAL_RETRY_INTERVAL
|
||||
.saturating_mul(multiplier)
|
||||
.min(REGISTRY_RECOVERY_MAX_RETRY_INTERVAL);
|
||||
let base_millis = base_delay.as_millis() as u64;
|
||||
let mut hasher = DefaultHasher::new();
|
||||
session_id.hash(&mut hasher);
|
||||
attempt.hash(&mut hasher);
|
||||
|
||||
Duration::from_millis(base_millis + hasher.finish() % (base_millis / 2 + 1))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "client_recovery_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn registry_error() -> ExecServerError {
|
||||
ExecServerError::EnvironmentRegistryHttp {
|
||||
status: reqwest::StatusCode::TOO_MANY_REQUESTS,
|
||||
code: None,
|
||||
message: "registry unavailable".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn registry_recovery_retry_delay_exponentially_backs_off_and_caps() {
|
||||
let cases = [
|
||||
(0, Duration::from_millis(500)),
|
||||
(1, Duration::from_secs(1)),
|
||||
(2, Duration::from_secs(2)),
|
||||
(3, Duration::from_secs(4)),
|
||||
(4, Duration::from_secs(5)),
|
||||
(20, Duration::from_secs(5)),
|
||||
];
|
||||
|
||||
for (attempt, base) in cases {
|
||||
let delay = registry_recovery_retry_delay("session-1", attempt);
|
||||
assert!(delay >= base, "delay {delay:?} for attempt {attempt}");
|
||||
assert!(
|
||||
delay <= base + base / 2,
|
||||
"delay {delay:?} for attempt {attempt}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recovery_retries_transient_registry_errors() {
|
||||
let error = registry_error();
|
||||
|
||||
assert!(is_retryable_registry_error(&error));
|
||||
assert!(is_retryable_recovery_error(&error));
|
||||
}
|
||||
Reference in New Issue
Block a user