From 2af4a8dc12414f662837b2794bf0cee0ffbad77d Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Mon, 26 Jan 2026 06:22:46 +0800 Subject: [PATCH] refactor(runtime): implement retry logic for Antigravity executor with improved error handling and capacity management --- .../runtime/executor/antigravity_executor.go | 734 ++++++++++-------- 1 file changed, 424 insertions(+), 310 deletions(-) diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 1ceb0f73..a4156302 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -148,87 +148,108 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - var lastStatus int - var lastBody []byte - var lastErr error + attempts := antigravityRetryAttempts(e.cfg) - for idx, baseURL := range baseURLs { - httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, false, opts.Alt, baseURL) - if errReq != nil { - err = errReq - return resp, err - } +attemptLoop: + for attempt := 0; attempt < attempts; attempt++ { + var lastStatus int + var lastBody []byte + var lastErr error - httpResp, errDo := httpClient.Do(httpReq) - if errDo != nil { - recordAPIResponseError(ctx, e.cfg, errDo) - if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { - return resp, errDo + for idx, baseURL := range baseURLs { + httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, false, opts.Alt, baseURL) + if errReq != nil { + err = errReq + return resp, err } - lastStatus = 0 - lastBody = nil - lastErr = errDo - if idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue + + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return resp, errDo + } + lastStatus = 0 + lastBody = nil + lastErr = errDo + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errDo + return resp, err } - err = errDo - return resp, err + + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + bodyBytes, errRead := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + err = errRead + return resp, err + } + appendAPIResponseChunk(ctx, e.cfg, bodyBytes) + + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes)) + lastStatus = httpResp.StatusCode + lastBody = append([]byte(nil), bodyBytes...) + lastErr = nil + if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) { + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if attempt+1 < attempts { + delay := antigravityNoCapacityRetryDelay(attempt) + log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts) + if errWait := antigravityWait(ctx, delay); errWait != nil { + return resp, errWait + } + continue attemptLoop + } + } + sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} + if httpResp.StatusCode == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + sErr.retryAfter = retryAfter + } + } + err = sErr + return resp, err + } + + reporter.publish(ctx, parseAntigravityUsage(bodyBytes)) + var param any + converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, ¶m) + resp = cliproxyexecutor.Response{Payload: []byte(converted)} + reporter.ensurePublished(ctx) + return resp, nil } - recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - bodyBytes, errRead := io.ReadAll(httpResp.Body) - if errClose := httpResp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - if errRead != nil { - recordAPIResponseError(ctx, e.cfg, errRead) - err = errRead - return resp, err - } - appendAPIResponseChunk(ctx, e.cfg, bodyBytes) - - if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { - log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes)) - lastStatus = httpResp.StatusCode - lastBody = append([]byte(nil), bodyBytes...) - lastErr = nil - if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue - } - sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} - if httpResp.StatusCode == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + switch { + case lastStatus != 0: + sErr := statusErr{code: lastStatus, msg: string(lastBody)} + if lastStatus == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { sErr.retryAfter = retryAfter } } err = sErr - return resp, err + case lastErr != nil: + err = lastErr + default: + err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} } - - reporter.publish(ctx, parseAntigravityUsage(bodyBytes)) - var param any - converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, ¶m) - resp = cliproxyexecutor.Response{Payload: []byte(converted)} - reporter.ensurePublished(ctx) - return resp, nil + return resp, err } - switch { - case lastStatus != 0: - sErr := statusErr{code: lastStatus, msg: string(lastBody)} - if lastStatus == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { - sErr.retryAfter = retryAfter - } - } - err = sErr - case lastErr != nil: - err = lastErr - default: - err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} - } return resp, err } @@ -268,150 +289,171 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth * baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - var lastStatus int - var lastBody []byte - var lastErr error + attempts := antigravityRetryAttempts(e.cfg) - for idx, baseURL := range baseURLs { - httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL) - if errReq != nil { - err = errReq - return resp, err - } +attemptLoop: + for attempt := 0; attempt < attempts; attempt++ { + var lastStatus int + var lastBody []byte + var lastErr error - httpResp, errDo := httpClient.Do(httpReq) - if errDo != nil { - recordAPIResponseError(ctx, e.cfg, errDo) - if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { - return resp, errDo + for idx, baseURL := range baseURLs { + httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL) + if errReq != nil { + err = errReq + return resp, err } - lastStatus = 0 - lastBody = nil - lastErr = errDo - if idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue - } - err = errDo - return resp, err - } - recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { - bodyBytes, errRead := io.ReadAll(httpResp.Body) - if errClose := httpResp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - if errRead != nil { - recordAPIResponseError(ctx, e.cfg, errRead) - if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { - err = errRead - return resp, err - } - if errCtx := ctx.Err(); errCtx != nil { - err = errCtx - return resp, err + + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return resp, errDo } lastStatus = 0 lastBody = nil - lastErr = errRead + lastErr = errDo if idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) continue } - err = errRead + err = errDo return resp, err } - appendAPIResponseChunk(ctx, e.cfg, bodyBytes) - lastStatus = httpResp.StatusCode - lastBody = append([]byte(nil), bodyBytes...) - lastErr = nil - if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + bodyBytes, errRead := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { + err = errRead + return resp, err + } + if errCtx := ctx.Err(); errCtx != nil { + err = errCtx + return resp, err + } + lastStatus = 0 + lastBody = nil + lastErr = errRead + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errRead + return resp, err + } + appendAPIResponseChunk(ctx, e.cfg, bodyBytes) + lastStatus = httpResp.StatusCode + lastBody = append([]byte(nil), bodyBytes...) + lastErr = nil + if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) { + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if attempt+1 < attempts { + delay := antigravityNoCapacityRetryDelay(attempt) + log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts) + if errWait := antigravityWait(ctx, delay); errWait != nil { + return resp, errWait + } + continue attemptLoop + } + } + sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} + if httpResp.StatusCode == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + sErr.retryAfter = retryAfter + } + } + err = sErr + return resp, err } - sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} - if httpResp.StatusCode == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + + out := make(chan cliproxyexecutor.StreamChunk) + go func(resp *http.Response) { + defer close(out) + defer func() { + if errClose := resp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(nil, streamScannerBuffer) + for scanner.Scan() { + line := scanner.Bytes() + appendAPIResponseChunk(ctx, e.cfg, line) + + // Filter usage metadata for all models + // Only retain usage statistics in the terminal chunk + line = FilterSSEUsageMetadata(line) + + payload := jsonPayload(line) + if payload == nil { + continue + } + + if detail, ok := parseAntigravityStreamUsage(payload); ok { + reporter.publish(ctx, detail) + } + + out <- cliproxyexecutor.StreamChunk{Payload: payload} + } + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} + } else { + reporter.ensurePublished(ctx) + } + }(httpResp) + + var buffer bytes.Buffer + for chunk := range out { + if chunk.Err != nil { + return resp, chunk.Err + } + if len(chunk.Payload) > 0 { + _, _ = buffer.Write(chunk.Payload) + _, _ = buffer.Write([]byte("\n")) + } + } + resp = cliproxyexecutor.Response{Payload: e.convertStreamToNonStream(buffer.Bytes())} + + reporter.publish(ctx, parseAntigravityUsage(resp.Payload)) + var param any + converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, resp.Payload, ¶m) + resp = cliproxyexecutor.Response{Payload: []byte(converted)} + reporter.ensurePublished(ctx) + + return resp, nil + } + + switch { + case lastStatus != 0: + sErr := statusErr{code: lastStatus, msg: string(lastBody)} + if lastStatus == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { sErr.retryAfter = retryAfter } } err = sErr - return resp, err + case lastErr != nil: + err = lastErr + default: + err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} } - - out := make(chan cliproxyexecutor.StreamChunk) - go func(resp *http.Response) { - defer close(out) - defer func() { - if errClose := resp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - }() - scanner := bufio.NewScanner(resp.Body) - scanner.Buffer(nil, streamScannerBuffer) - for scanner.Scan() { - line := scanner.Bytes() - appendAPIResponseChunk(ctx, e.cfg, line) - - // Filter usage metadata for all models - // Only retain usage statistics in the terminal chunk - line = FilterSSEUsageMetadata(line) - - payload := jsonPayload(line) - if payload == nil { - continue - } - - if detail, ok := parseAntigravityStreamUsage(payload); ok { - reporter.publish(ctx, detail) - } - - out <- cliproxyexecutor.StreamChunk{Payload: payload} - } - if errScan := scanner.Err(); errScan != nil { - recordAPIResponseError(ctx, e.cfg, errScan) - reporter.publishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} - } else { - reporter.ensurePublished(ctx) - } - }(httpResp) - - var buffer bytes.Buffer - for chunk := range out { - if chunk.Err != nil { - return resp, chunk.Err - } - if len(chunk.Payload) > 0 { - _, _ = buffer.Write(chunk.Payload) - _, _ = buffer.Write([]byte("\n")) - } - } - resp = cliproxyexecutor.Response{Payload: e.convertStreamToNonStream(buffer.Bytes())} - - reporter.publish(ctx, parseAntigravityUsage(resp.Payload)) - var param any - converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, resp.Payload, ¶m) - resp = cliproxyexecutor.Response{Payload: []byte(converted)} - reporter.ensurePublished(ctx) - - return resp, nil + return resp, err } - switch { - case lastStatus != 0: - sErr := statusErr{code: lastStatus, msg: string(lastBody)} - if lastStatus == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { - sErr.retryAfter = retryAfter - } - } - err = sErr - case lastErr != nil: - err = lastErr - default: - err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} - } return resp, err } @@ -635,139 +677,160 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - var lastStatus int - var lastBody []byte - var lastErr error + attempts := antigravityRetryAttempts(e.cfg) - for idx, baseURL := range baseURLs { - httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL) - if errReq != nil { - err = errReq - return nil, err - } - httpResp, errDo := httpClient.Do(httpReq) - if errDo != nil { - recordAPIResponseError(ctx, e.cfg, errDo) - if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { - return nil, errDo +attemptLoop: + for attempt := 0; attempt < attempts; attempt++ { + var lastStatus int + var lastBody []byte + var lastErr error + + for idx, baseURL := range baseURLs { + httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL) + if errReq != nil { + err = errReq + return nil, err } - lastStatus = 0 - lastBody = nil - lastErr = errDo - if idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue - } - err = errDo - return nil, err - } - recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { - bodyBytes, errRead := io.ReadAll(httpResp.Body) - if errClose := httpResp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - if errRead != nil { - recordAPIResponseError(ctx, e.cfg, errRead) - if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { - err = errRead - return nil, err - } - if errCtx := ctx.Err(); errCtx != nil { - err = errCtx - return nil, err + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return nil, errDo } lastStatus = 0 lastBody = nil - lastErr = errRead + lastErr = errDo if idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) continue } - err = errRead + err = errDo return nil, err } - appendAPIResponseChunk(ctx, e.cfg, bodyBytes) - lastStatus = httpResp.StatusCode - lastBody = append([]byte(nil), bodyBytes...) - lastErr = nil - if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { - log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) - continue + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + bodyBytes, errRead := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { + err = errRead + return nil, err + } + if errCtx := ctx.Err(); errCtx != nil { + err = errCtx + return nil, err + } + lastStatus = 0 + lastBody = nil + lastErr = errRead + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errRead + return nil, err + } + appendAPIResponseChunk(ctx, e.cfg, bodyBytes) + lastStatus = httpResp.StatusCode + lastBody = append([]byte(nil), bodyBytes...) + lastErr = nil + if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) { + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + if attempt+1 < attempts { + delay := antigravityNoCapacityRetryDelay(attempt) + log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts) + if errWait := antigravityWait(ctx, delay); errWait != nil { + return nil, errWait + } + continue attemptLoop + } + } + sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} + if httpResp.StatusCode == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + sErr.retryAfter = retryAfter + } + } + err = sErr + return nil, err } - sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} - if httpResp.StatusCode == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil { + + out := make(chan cliproxyexecutor.StreamChunk) + stream = out + go func(resp *http.Response) { + defer close(out) + defer func() { + if errClose := resp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(nil, streamScannerBuffer) + var param any + for scanner.Scan() { + line := scanner.Bytes() + appendAPIResponseChunk(ctx, e.cfg, line) + + // Filter usage metadata for all models + // Only retain usage statistics in the terminal chunk + line = FilterSSEUsageMetadata(line) + + payload := jsonPayload(line) + if payload == nil { + continue + } + + if detail, ok := parseAntigravityStreamUsage(payload); ok { + reporter.publish(ctx, detail) + } + + chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), ¶m) + for i := range chunks { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} + } + } + tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), ¶m) + for i := range tail { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])} + } + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} + } else { + reporter.ensurePublished(ctx) + } + }(httpResp) + return stream, nil + } + + switch { + case lastStatus != 0: + sErr := statusErr{code: lastStatus, msg: string(lastBody)} + if lastStatus == http.StatusTooManyRequests { + if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { sErr.retryAfter = retryAfter } } err = sErr - return nil, err + case lastErr != nil: + err = lastErr + default: + err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} } - - out := make(chan cliproxyexecutor.StreamChunk) - stream = out - go func(resp *http.Response) { - defer close(out) - defer func() { - if errClose := resp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - }() - scanner := bufio.NewScanner(resp.Body) - scanner.Buffer(nil, streamScannerBuffer) - var param any - for scanner.Scan() { - line := scanner.Bytes() - appendAPIResponseChunk(ctx, e.cfg, line) - - // Filter usage metadata for all models - // Only retain usage statistics in the terminal chunk - line = FilterSSEUsageMetadata(line) - - payload := jsonPayload(line) - if payload == nil { - continue - } - - if detail, ok := parseAntigravityStreamUsage(payload); ok { - reporter.publish(ctx, detail) - } - - chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), ¶m) - for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} - } - } - tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), ¶m) - for i := range tail { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])} - } - if errScan := scanner.Err(); errScan != nil { - recordAPIResponseError(ctx, e.cfg, errScan) - reporter.publishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} - } else { - reporter.ensurePublished(ctx) - } - }(httpResp) - return stream, nil + return nil, err } - switch { - case lastStatus != 0: - sErr := statusErr{code: lastStatus, msg: string(lastBody)} - if lastStatus == http.StatusTooManyRequests { - if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil { - sErr.retryAfter = retryAfter - } - } - err = sErr - case lastErr != nil: - err = lastErr - default: - err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} - } return nil, err } @@ -1384,14 +1447,65 @@ func resolveUserAgent(auth *cliproxyauth.Auth) string { return defaultAntigravityAgent } +func antigravityRetryAttempts(cfg *config.Config) int { + if cfg == nil { + return 1 + } + retry := cfg.RequestRetry + if retry < 0 { + retry = 0 + } + attempts := retry + 1 + if attempts < 1 { + return 1 + } + return attempts +} + +func antigravityShouldRetryNoCapacity(statusCode int, body []byte) bool { + if statusCode != http.StatusServiceUnavailable { + return false + } + if len(body) == 0 { + return false + } + msg := strings.ToLower(string(body)) + return strings.Contains(msg, "no capacity available") +} + +func antigravityNoCapacityRetryDelay(attempt int) time.Duration { + if attempt < 0 { + attempt = 0 + } + delay := time.Duration(attempt+1) * 250 * time.Millisecond + if delay > 2*time.Second { + delay = 2 * time.Second + } + return delay +} + +func antigravityWait(ctx context.Context, wait time.Duration) error { + if wait <= 0 { + return nil + } + timer := time.NewTimer(wait) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + func antigravityBaseURLFallbackOrder(auth *cliproxyauth.Auth) []string { if base := resolveCustomAntigravityBaseURL(auth); base != "" { return []string{base} } return []string{ - antigravitySandboxBaseURLDaily, antigravityBaseURLDaily, - antigravityBaseURLProd, + antigravitySandboxBaseURLDaily, + // antigravityBaseURLProd, } }