**refactor(executor): simplify Gemini CLI execution and remove internal retry logic**

- Removed nested retry handling for 429 rate limit errors.
- Simplified request/response handling by cleaning redundant retry-related code.
- Eliminated `parseRetryDelay` function and max retry configuration logic.
This commit is contained in:
Luis Pater
2025-11-20 17:28:22 +08:00
parent 0586da9c2b
commit d50b0f7524
3 changed files with 217 additions and 272 deletions

View File

@@ -99,15 +99,7 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
var lastStatus int var lastStatus int
var lastBody []byte var lastBody []byte
// Get max retry count from config, default to 3 if not set
maxRetries := e.cfg.RequestRetry
if maxRetries <= 0 {
maxRetries = 3
}
for idx, attemptModel := range models { for idx, attemptModel := range models {
// Inner retry loop for 429 errors on the same model
for retryCount := 0; retryCount <= maxRetries; retryCount++ {
payload := append([]byte(nil), basePayload...) payload := append([]byte(nil), basePayload...)
if action == "countTokens" { if action == "countTokens" {
payload = deleteJSONField(payload, "project") payload = deleteJSONField(payload, "project")
@@ -179,43 +171,17 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
lastStatus = httpResp.StatusCode lastStatus = httpResp.StatusCode
lastBody = append([]byte(nil), data...) lastBody = append([]byte(nil), data...)
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
// Handle 429 rate limit errors with retry
if httpResp.StatusCode == 429 { if httpResp.StatusCode == 429 {
if retryCount < maxRetries {
// Parse retry delay from Google's response
retryDelay := parseRetryDelay(data)
log.Infof("gemini cli executor: rate limited (429), retrying model %s in %v (attempt %d/%d)", attemptModel, retryDelay, retryCount+1, maxRetries)
// Wait for the specified delay
select {
case <-time.After(retryDelay):
// Continue to next retry iteration
continue
case <-ctx.Done():
// Context cancelled, return immediately
err = ctx.Err()
return resp, err
}
} else {
// Exhausted retries for this model, try next model if available
if idx+1 < len(models) { if idx+1 < len(models) {
log.Infof("gemini cli executor: rate limited, exhausted %d retries for model %s, trying fallback model: %s", maxRetries, attemptModel, models[idx+1]) log.Debugf("gemini cli executor: rate limited, retrying with next model: %s", models[idx+1])
break // Break inner loop to try next model
} else { } else {
log.Infof("gemini cli executor: rate limited, exhausted %d retries for model %s, no additional fallback model", maxRetries, attemptModel) log.Debug("gemini cli executor: rate limited, no additional fallback model")
// No more models to try, will return error below
} }
} continue
} else {
// Non-429 error, don't retry this model
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return resp, err
} }
// Break inner loop if we hit this point (no retry needed or exhausted retries) err = newGeminiStatusErr(httpResp.StatusCode, data)
break return resp, err
}
} }
if len(lastBody) > 0 { if len(lastBody) > 0 {
@@ -224,7 +190,7 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
if lastStatus == 0 { if lastStatus == 0 {
lastStatus = 429 lastStatus = 429
} }
err = statusErr{code: lastStatus, msg: string(lastBody)} err = newGeminiStatusErr(lastStatus, lastBody)
return resp, err return resp, err
} }
@@ -269,21 +235,8 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
var lastStatus int var lastStatus int
var lastBody []byte var lastBody []byte
// Get max retry count from config, default to 3 if not set
maxRetries := e.cfg.RequestRetry
if maxRetries <= 0 {
maxRetries = 3
}
for idx, attemptModel := range models { for idx, attemptModel := range models {
var httpResp *http.Response payload := append([]byte(nil), basePayload...)
var payload []byte
var errDo error
shouldContinueToNextModel := false
// Inner retry loop for 429 errors on the same model
for retryCount := 0; retryCount <= maxRetries; retryCount++ {
payload = append([]byte(nil), basePayload...)
payload = setJSONField(payload, "project", projectID) payload = setJSONField(payload, "project", projectID)
payload = setJSONField(payload, "model", attemptModel) payload = setJSONField(payload, "model", attemptModel)
@@ -322,7 +275,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
AuthValue: authValue, AuthValue: authValue,
}) })
httpResp, errDo = httpClient.Do(reqHTTP) httpResp, errDo := httpClient.Do(reqHTTP)
if errDo != nil { if errDo != nil {
recordAPIResponseError(ctx, e.cfg, errDo) recordAPIResponseError(ctx, e.cfg, errDo)
err = errDo err = errDo
@@ -343,61 +296,18 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
lastStatus = httpResp.StatusCode lastStatus = httpResp.StatusCode
lastBody = append([]byte(nil), data...) lastBody = append([]byte(nil), data...)
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
// Handle 429 rate limit errors with retry
if httpResp.StatusCode == 429 { if httpResp.StatusCode == 429 {
if retryCount < maxRetries {
// Parse retry delay from Google's response
retryDelay := parseRetryDelay(data)
log.Infof("gemini cli executor: rate limited (429), retrying stream model %s in %v (attempt %d/%d)", attemptModel, retryDelay, retryCount+1, maxRetries)
// Wait for the specified delay
select {
case <-time.After(retryDelay):
// Continue to next retry iteration
continue
case <-ctx.Done():
// Context cancelled, return immediately
err = ctx.Err()
return nil, err
}
} else {
// Exhausted retries for this model, try next model if available
if idx+1 < len(models) { if idx+1 < len(models) {
log.Infof("gemini cli executor: rate limited, exhausted %d retries for stream model %s, trying fallback model: %s", maxRetries, attemptModel, models[idx+1]) log.Debugf("gemini cli executor: rate limited, retrying with next model: %s", models[idx+1])
shouldContinueToNextModel = true
break // Break inner loop to try next model
} else { } else {
log.Infof("gemini cli executor: rate limited, exhausted %d retries for stream model %s, no additional fallback model", maxRetries, attemptModel) log.Debug("gemini cli executor: rate limited, no additional fallback model")
// No more models to try, will return error below
} }
continue
} }
} else { err = newGeminiStatusErr(httpResp.StatusCode, data)
// Non-429 error, don't retry this model
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return nil, err return nil, err
} }
// Break inner loop if we hit this point (no retry needed or exhausted retries)
break
}
// Success - httpResp.StatusCode is 2xx, break out of retry loop
// and proceed to streaming logic below
break
}
// If we need to try the next fallback model, skip streaming logic
if shouldContinueToNextModel {
continue
}
// If we have a failed response (non-2xx), don't attempt streaming
// Continue outer loop to try next model or return error
if httpResp == nil || httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
continue
}
out := make(chan cliproxyexecutor.StreamChunk) out := make(chan cliproxyexecutor.StreamChunk)
stream = out stream = out
go func(resp *http.Response, reqBody []byte, attempt string) { go func(resp *http.Response, reqBody []byte, attempt string) {
@@ -467,7 +377,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
if lastStatus == 0 { if lastStatus == 0 {
lastStatus = 429 lastStatus = 429
} }
err = statusErr{code: lastStatus, msg: string(lastBody)} err = newGeminiStatusErr(lastStatus, lastBody)
return nil, err return nil, err
} }
@@ -575,7 +485,7 @@ func (e *GeminiCLIExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.
if lastStatus == 0 { if lastStatus == 0 {
lastStatus = 429 lastStatus = 429
} }
return cliproxyexecutor.Response{}, statusErr{code: lastStatus, msg: string(lastBody)} return cliproxyexecutor.Response{}, newGeminiStatusErr(lastStatus, lastBody)
} }
func (e *GeminiCLIExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { func (e *GeminiCLIExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) {
@@ -860,19 +770,25 @@ func fixGeminiCLIImageAspectRatio(modelName string, rawJSON []byte) []byte {
return rawJSON return rawJSON
} }
func newGeminiStatusErr(statusCode int, body []byte) statusErr {
err := statusErr{code: statusCode, msg: string(body)}
if statusCode == http.StatusTooManyRequests {
if retryAfter, parseErr := parseRetryDelay(body); parseErr == nil && retryAfter != nil {
err.retryAfter = retryAfter
}
}
return err
}
// parseRetryDelay extracts the retry delay from a Google API 429 error response. // parseRetryDelay extracts the retry delay from a Google API 429 error response.
// The error response contains a RetryInfo.retryDelay field in the format "0.847655010s". // The error response contains a RetryInfo.retryDelay field in the format "0.847655010s".
// Returns the duration to wait, or a default duration if parsing fails. // Returns the parsed duration or an error if it cannot be determined.
func parseRetryDelay(errorBody []byte) time.Duration { func parseRetryDelay(errorBody []byte) (*time.Duration, error) {
const defaultDelay = 1 * time.Second
const maxDelay = 60 * time.Second
// Try to parse the retryDelay from the error response // Try to parse the retryDelay from the error response
// Format: error.details[].retryDelay where @type == "type.googleapis.com/google.rpc.RetryInfo" // Format: error.details[].retryDelay where @type == "type.googleapis.com/google.rpc.RetryInfo"
details := gjson.GetBytes(errorBody, "error.details") details := gjson.GetBytes(errorBody, "error.details")
if !details.Exists() || !details.IsArray() { if !details.Exists() || !details.IsArray() {
log.Debugf("parseRetryDelay: no error.details found, using default delay %v", defaultDelay) return nil, fmt.Errorf("no error.details found")
return defaultDelay
} }
for _, detail := range details.Array() { for _, detail := range details.Array() {
@@ -883,24 +799,12 @@ func parseRetryDelay(errorBody []byte) time.Duration {
// Parse duration string like "0.847655010s" // Parse duration string like "0.847655010s"
duration, err := time.ParseDuration(retryDelay) duration, err := time.ParseDuration(retryDelay)
if err != nil { if err != nil {
log.Debugf("parseRetryDelay: failed to parse duration %q: %v, using default", retryDelay, err) return nil, fmt.Errorf("failed to parse duration")
return defaultDelay
} }
// Cap at maxDelay to prevent excessive waits return &duration, nil
if duration > maxDelay {
log.Debugf("parseRetryDelay: capping delay from %v to %v", duration, maxDelay)
return maxDelay
}
if duration < 0 {
log.Debugf("parseRetryDelay: negative delay %v, using default", duration)
return defaultDelay
}
log.Debugf("parseRetryDelay: using delay %v from API response", duration)
return duration
} }
} }
} }
log.Debugf("parseRetryDelay: no RetryInfo found, using default delay %v", defaultDelay) return nil, fmt.Errorf("no RetryInfo found")
return defaultDelay
} }

View File

@@ -8,6 +8,7 @@ import (
"io" "io"
"net/http" "net/http"
"strings" "strings"
"time"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util" "github.com/router-for-me/CLIProxyAPI/v6/internal/util"
@@ -342,6 +343,7 @@ func (e *OpenAICompatExecutor) overrideModel(payload []byte, model string) []byt
type statusErr struct { type statusErr struct {
code int code int
msg string msg string
retryAfter *time.Duration
} }
func (e statusErr) Error() string { func (e statusErr) Error() string {
@@ -351,3 +353,4 @@ func (e statusErr) Error() string {
return fmt.Sprintf("status %d", e.code) return fmt.Sprintf("status %d", e.code)
} }
func (e statusErr) StatusCode() int { return e.code } func (e statusErr) StatusCode() int { return e.code }
func (e statusErr) RetryAfter() *time.Duration { return e.retryAfter }

View File

@@ -62,6 +62,8 @@ type Result struct {
Model string Model string
// Success marks whether the execution succeeded. // Success marks whether the execution succeeded.
Success bool Success bool
// RetryAfter carries a provider supplied retry hint (e.g. 429 retryDelay).
RetryAfter *time.Duration
// Error describes the failure when Success is false. // Error describes the failure when Success is false.
Error *Error Error *Error
} }
@@ -325,6 +327,9 @@ func (m *Manager) executeWithProvider(ctx context.Context, provider string, req
if errors.As(errExec, &se) && se != nil { if errors.As(errExec, &se) && se != nil {
result.Error.HTTPStatus = se.StatusCode() result.Error.HTTPStatus = se.StatusCode()
} }
if ra := retryAfterFromError(errExec); ra != nil {
result.RetryAfter = ra
}
m.MarkResult(execCtx, result) m.MarkResult(execCtx, result)
lastErr = errExec lastErr = errExec
continue continue
@@ -370,6 +375,9 @@ func (m *Manager) executeCountWithProvider(ctx context.Context, provider string,
if errors.As(errExec, &se) && se != nil { if errors.As(errExec, &se) && se != nil {
result.Error.HTTPStatus = se.StatusCode() result.Error.HTTPStatus = se.StatusCode()
} }
if ra := retryAfterFromError(errExec); ra != nil {
result.RetryAfter = ra
}
m.MarkResult(execCtx, result) m.MarkResult(execCtx, result)
lastErr = errExec lastErr = errExec
continue continue
@@ -415,6 +423,7 @@ func (m *Manager) executeStreamWithProvider(ctx context.Context, provider string
rerr.HTTPStatus = se.StatusCode() rerr.HTTPStatus = se.StatusCode()
} }
result := Result{AuthID: auth.ID, Provider: provider, Model: req.Model, Success: false, Error: rerr} result := Result{AuthID: auth.ID, Provider: provider, Model: req.Model, Success: false, Error: rerr}
result.RetryAfter = retryAfterFromError(errStream)
m.MarkResult(execCtx, result) m.MarkResult(execCtx, result)
lastErr = errStream lastErr = errStream
continue continue
@@ -556,17 +565,23 @@ func (m *Manager) MarkResult(ctx context.Context, result Result) {
suspendReason = "payment_required" suspendReason = "payment_required"
shouldSuspendModel = true shouldSuspendModel = true
case 429: case 429:
cooldown, nextLevel := nextQuotaCooldown(state.Quota.BackoffLevel)
var next time.Time var next time.Time
backoffLevel := state.Quota.BackoffLevel
if result.RetryAfter != nil {
next = now.Add(*result.RetryAfter)
} else {
cooldown, nextLevel := nextQuotaCooldown(backoffLevel)
if cooldown > 0 { if cooldown > 0 {
next = now.Add(cooldown) next = now.Add(cooldown)
} }
backoffLevel = nextLevel
}
state.NextRetryAfter = next state.NextRetryAfter = next
state.Quota = QuotaState{ state.Quota = QuotaState{
Exceeded: true, Exceeded: true,
Reason: "quota", Reason: "quota",
NextRecoverAt: next, NextRecoverAt: next,
BackoffLevel: nextLevel, BackoffLevel: backoffLevel,
} }
suspendReason = "quota" suspendReason = "quota"
shouldSuspendModel = true shouldSuspendModel = true
@@ -582,7 +597,7 @@ func (m *Manager) MarkResult(ctx context.Context, result Result) {
auth.UpdatedAt = now auth.UpdatedAt = now
updateAggregatedAvailability(auth, now) updateAggregatedAvailability(auth, now)
} else { } else {
applyAuthFailureState(auth, result.Error, now) applyAuthFailureState(auth, result.Error, result.RetryAfter, now)
} }
} }
@@ -742,6 +757,25 @@ func cloneError(err *Error) *Error {
} }
} }
func retryAfterFromError(err error) *time.Duration {
if err == nil {
return nil
}
type retryAfterProvider interface {
RetryAfter() *time.Duration
}
rap, ok := err.(retryAfterProvider)
if !ok || rap == nil {
return nil
}
retryAfter := rap.RetryAfter()
if retryAfter == nil {
return nil
}
val := *retryAfter
return &val
}
func statusCodeFromResult(err *Error) int { func statusCodeFromResult(err *Error) int {
if err == nil { if err == nil {
return 0 return 0
@@ -749,7 +783,7 @@ func statusCodeFromResult(err *Error) int {
return err.StatusCode() return err.StatusCode()
} }
func applyAuthFailureState(auth *Auth, resultErr *Error, now time.Time) { func applyAuthFailureState(auth *Auth, resultErr *Error, retryAfter *time.Duration, now time.Time) {
if auth == nil { if auth == nil {
return return
} }
@@ -774,13 +808,17 @@ func applyAuthFailureState(auth *Auth, resultErr *Error, now time.Time) {
auth.StatusMessage = "quota exhausted" auth.StatusMessage = "quota exhausted"
auth.Quota.Exceeded = true auth.Quota.Exceeded = true
auth.Quota.Reason = "quota" auth.Quota.Reason = "quota"
cooldown, nextLevel := nextQuotaCooldown(auth.Quota.BackoffLevel)
var next time.Time var next time.Time
if retryAfter != nil {
next = now.Add(*retryAfter)
} else {
cooldown, nextLevel := nextQuotaCooldown(auth.Quota.BackoffLevel)
if cooldown > 0 { if cooldown > 0 {
next = now.Add(cooldown) next = now.Add(cooldown)
} }
auth.Quota.NextRecoverAt = next
auth.Quota.BackoffLevel = nextLevel auth.Quota.BackoffLevel = nextLevel
}
auth.Quota.NextRecoverAt = next
auth.NextRetryAfter = next auth.NextRetryAfter = next
case 408, 500, 502, 503, 504: case 408, 500, 502, 503, 504:
auth.StatusMessage = "transient upstream error" auth.StatusMessage = "transient upstream error"