From 257621c5ed15c53ac17edef055c16672cbf96ea3 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Sun, 23 Nov 2025 14:50:58 +0800 Subject: [PATCH] **chore(executor): update default agent version and simplify const formatting** - Updated `defaultAntigravityAgent` to version `1.11.5`. - Adjusted const value formatting for improved readability. **feat(executor): introduce fallback mechanism for Antigravity base URLs** - Added retry logic with fallback order for Antigravity base URLs to handle request errors and rate limits. - Refactored base URL handling with `antigravityBaseURLFallbackOrder` and related utilities. - Enhanced error handling in non-streaming and streaming requests with retry support and improved metadata reporting. - Updated `buildRequest` to support dynamic base URL assignment. --- .../runtime/executor/antigravity_executor.go | 445 +++++++++++------- 1 file changed, 281 insertions(+), 164 deletions(-) diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index c41d8d0e..94b84647 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -26,16 +26,18 @@ import ( ) const ( - antigravityBaseURL = "https://daily-cloudcode-pa.sandbox.googleapis.com" - antigravityStreamPath = "/v1internal:streamGenerateContent" - antigravityGeneratePath = "/v1internal:generateContent" - antigravityModelsPath = "/v1internal:fetchAvailableModels" - antigravityClientID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com" - antigravityClientSecret = "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" - defaultAntigravityAgent = "antigravity/1.11.3 windows/amd64" - antigravityAuthType = "antigravity" - refreshSkew = 5 * time.Minute - streamScannerBuffer int = 20_971_520 + antigravityBaseURLDaily = "https://daily-cloudcode-pa.sandbox.googleapis.com" + antigravityBaseURLAutopush = "https://autopush-cloudcode-pa.sandbox.googleapis.com" + antigravityBaseURLProd = "https://cloudcode-pa.googleapis.com" + antigravityStreamPath = "/v1internal:streamGenerateContent" + antigravityGeneratePath = "/v1internal:generateContent" + antigravityModelsPath = "/v1internal:fetchAvailableModels" + antigravityClientID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com" + antigravityClientSecret = "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" + defaultAntigravityAgent = "antigravity/1.11.5 windows/amd64" + antigravityAuthType = "antigravity" + refreshSkew = 3000 * time.Second + streamScannerBuffer int = 20_971_520 ) var randSource = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -73,43 +75,76 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au to := sdktranslator.FromString("antigravity") translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false) - httpReq, errReq := e.buildRequest(ctx, auth, token, req.Model, translated, false, opts.Alt) - if errReq != nil { - return resp, errReq - } - + baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - httpResp, errDo := httpClient.Do(httpReq) - if errDo != nil { - recordAPIResponseError(ctx, e.cfg, errDo) - return resp, errDo - } - defer func() { + + var lastStatus int + var lastBody []byte + var lastErr error + + for idx, baseURL := range baseURLs { + httpReq, errReq := e.buildRequest(ctx, auth, token, req.Model, translated, false, opts.Alt, baseURL) + if errReq != nil { + err = errReq + return resp, err + } + + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + recordAPIResponseError(ctx, e.cfg, errDo) + lastStatus = 0 + lastBody = nil + lastErr = errDo + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errDo + return resp, err + } + + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + bodyBytes, errRead := io.ReadAll(httpResp.Body) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("antigravity executor: close response body error: %v", errClose) } - }() + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + err = errRead + return resp, err + } + appendAPIResponseChunk(ctx, e.cfg, bodyBytes) - recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - bodyBytes, errRead := io.ReadAll(httpResp.Body) - if errRead != nil { - recordAPIResponseError(ctx, e.cfg, errRead) - return resp, errRead - } - appendAPIResponseChunk(ctx, e.cfg, bodyBytes) + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes)) + lastStatus = httpResp.StatusCode + lastBody = append([]byte(nil), bodyBytes...) + lastErr = nil + if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} + return resp, err + } - if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { - log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes)) - err = statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} - return resp, err + reporter.publish(ctx, parseAntigravityUsage(bodyBytes)) + var param any + converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, ¶m) + resp = cliproxyexecutor.Response{Payload: []byte(converted)} + reporter.ensurePublished(ctx) + return resp, nil } - reporter.publish(ctx, parseAntigravityUsage(bodyBytes)) - var param any - converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, ¶m) - resp = cliproxyexecutor.Response{Payload: []byte(converted)} - reporter.ensurePublished(ctx) - return resp, nil + switch { + case lastStatus != 0: + err = statusErr{code: lastStatus, msg: string(lastBody)} + case lastErr != nil: + err = lastErr + default: + err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} + } + return resp, err } // ExecuteStream handles streaming requests via the antigravity upstream. @@ -131,75 +166,121 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya to := sdktranslator.FromString("antigravity") translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) - httpReq, errReq := e.buildRequest(ctx, auth, token, req.Model, translated, true, opts.Alt) - if errReq != nil { - return nil, errReq - } - + baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - httpResp, errDo := httpClient.Do(httpReq) - if errDo != nil { - recordAPIResponseError(ctx, e.cfg, errDo) - return nil, errDo - } - recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { - bodyBytes, _ := io.ReadAll(httpResp.Body) - appendAPIResponseChunk(ctx, e.cfg, bodyBytes) - if errClose := httpResp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close response body error: %v", errClose) - } - err = statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} - return nil, err - } - out := make(chan cliproxyexecutor.StreamChunk) - stream = out - go func() { - defer close(out) - defer func() { + var lastStatus int + var lastBody []byte + var lastErr error + + for idx, baseURL := range baseURLs { + httpReq, errReq := e.buildRequest(ctx, auth, token, req.Model, translated, true, opts.Alt, baseURL) + if errReq != nil { + err = errReq + return nil, err + } + + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + recordAPIResponseError(ctx, e.cfg, errDo) + lastStatus = 0 + lastBody = nil + lastErr = errDo + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errDo + return nil, err + } + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + bodyBytes, errRead := io.ReadAll(httpResp.Body) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("antigravity executor: close response body error: %v", errClose) } - }() - scanner := bufio.NewScanner(httpResp.Body) - scanner.Buffer(nil, streamScannerBuffer) - var param any - for scanner.Scan() { - line := scanner.Bytes() - appendAPIResponseChunk(ctx, e.cfg, line) - - // Filter usage metadata for all models - // Only retain usage statistics in the terminal chunk - line = FilterSSEUsageMetadata(line) - - payload := jsonPayload(line) - if payload == nil { + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + lastStatus = 0 + lastBody = nil + lastErr = errRead + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errRead + return nil, err + } + appendAPIResponseChunk(ctx, e.cfg, bodyBytes) + lastStatus = httpResp.StatusCode + lastBody = append([]byte(nil), bodyBytes...) + lastErr = nil + if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) continue } + err = statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} + return nil, err + } - if detail, ok := parseAntigravityStreamUsage(payload); ok { - reporter.publish(ctx, detail) - } + out := make(chan cliproxyexecutor.StreamChunk) + stream = out + go func(resp *http.Response) { + defer close(out) + defer func() { + if errClose := resp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(nil, streamScannerBuffer) + var param any + for scanner.Scan() { + line := scanner.Bytes() + appendAPIResponseChunk(ctx, e.cfg, line) - chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), ¶m) - for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} + // Filter usage metadata for all models + // Only retain usage statistics in the terminal chunk + line = FilterSSEUsageMetadata(line) + + payload := jsonPayload(line) + if payload == nil { + continue + } + + if detail, ok := parseAntigravityStreamUsage(payload); ok { + reporter.publish(ctx, detail) + } + + chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), ¶m) + for i := range chunks { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} + } } - } - tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), ¶m) - for i := range tail { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])} - } - if errScan := scanner.Err(); errScan != nil { - recordAPIResponseError(ctx, e.cfg, errScan) - reporter.publishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} - } else { - reporter.ensurePublished(ctx) - } - }() - return stream, nil + tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), ¶m) + for i := range tail { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])} + } + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} + } else { + reporter.ensurePublished(ctx) + } + }(httpResp) + return stream, nil + } + + switch { + case lastStatus != 0: + err = statusErr{code: lastStatus, msg: string(lastBody)} + case lastErr != nil: + err = lastErr + default: + err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} + } + return nil, err } // Refresh refreshes the OAuth token using the refresh token. @@ -230,57 +311,72 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c auth = updatedAuth } - modelsURL := buildBaseURL(auth) + antigravityModelsPath - httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, modelsURL, bytes.NewReader([]byte(`{}`))) - if errReq != nil { - return nil - } - httpReq.Header.Set("Content-Type", "application/json") - httpReq.Header.Set("Authorization", "Bearer "+token) - httpReq.Header.Set("User-Agent", resolveUserAgent(auth)) - if host := resolveHost(auth); host != "" { - httpReq.Host = host - } - + baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newProxyAwareHTTPClient(ctx, cfg, auth, 0) - httpResp, errDo := httpClient.Do(httpReq) - if errDo != nil { - return nil - } - defer func() { + + for idx, baseURL := range baseURLs { + modelsURL := baseURL + antigravityModelsPath + httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, modelsURL, bytes.NewReader([]byte(`{}`))) + if errReq != nil { + return nil + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Authorization", "Bearer "+token) + httpReq.Header.Set("User-Agent", resolveUserAgent(auth)) + if host := resolveHost(baseURL); host != "" { + httpReq.Host = host + } + + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: models request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + return nil + } + + bodyBytes, errRead := io.ReadAll(httpResp.Body) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("antigravity executor: close response body error: %v", errClose) } - }() - - bodyBytes, errRead := io.ReadAll(httpResp.Body) - if errRead != nil { - return nil - } - if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { - return nil - } - - result := gjson.GetBytes(bodyBytes, "models") - if !result.Exists() { - return nil - } - - now := time.Now().Unix() - models := make([]*registry.ModelInfo, 0, len(result.Map())) - for id := range result.Map() { - id = modelName2Alias(id) - if id != "" { - models = append(models, ®istry.ModelInfo{ - ID: id, - Object: "model", - Created: now, - OwnedBy: antigravityAuthType, - Type: antigravityAuthType, - }) + if errRead != nil { + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: models read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + return nil } + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: models request rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + return nil + } + + result := gjson.GetBytes(bodyBytes, "models") + if !result.Exists() { + return nil + } + + now := time.Now().Unix() + models := make([]*registry.ModelInfo, 0, len(result.Map())) + for id := range result.Map() { + id = modelName2Alias(id) + if id != "" { + models = append(models, ®istry.ModelInfo{ + ID: id, + Object: "model", + Created: now, + OwnedBy: antigravityAuthType, + Type: antigravityAuthType, + }) + } + } + return models } - return models + return nil } func (e *AntigravityExecutor) ensureAccessToken(ctx context.Context, auth *cliproxyauth.Auth) (string, *cliproxyauth.Auth, error) { @@ -366,12 +462,15 @@ func (e *AntigravityExecutor) refreshToken(ctx context.Context, auth *cliproxyau return auth, nil } -func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyauth.Auth, token, modelName string, payload []byte, stream bool, alt string) (*http.Request, error) { +func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyauth.Auth, token, modelName string, payload []byte, stream bool, alt, baseURL string) (*http.Request, error) { if token == "" { return nil, statusErr{code: http.StatusUnauthorized, msg: "missing access token"} } - base := buildBaseURL(auth) + base := strings.TrimSuffix(baseURL, "/") + if base == "" { + base = buildBaseURL(auth) + } path := antigravityGeneratePath if stream { path = antigravityStreamPath @@ -405,7 +504,7 @@ func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyau } else { httpReq.Header.Set("Accept", "application/json") } - if host := resolveHost(auth); host != "" { + if host := resolveHost(base); host != "" { httpReq.Host = host } @@ -489,26 +588,13 @@ func int64Value(value any) (int64, bool) { } func buildBaseURL(auth *cliproxyauth.Auth) string { - if auth != nil { - if auth.Attributes != nil { - if v := strings.TrimSpace(auth.Attributes["base_url"]); v != "" { - return strings.TrimSuffix(v, "/") - } - } - if auth.Metadata != nil { - if v, ok := auth.Metadata["base_url"].(string); ok { - v = strings.TrimSpace(v) - if v != "" { - return strings.TrimSuffix(v, "/") - } - } - } + if baseURLs := antigravityBaseURLFallbackOrder(auth); len(baseURLs) > 0 { + return baseURLs[0] } - return antigravityBaseURL + return antigravityBaseURLAutopush } -func resolveHost(auth *cliproxyauth.Auth) string { - base := buildBaseURL(auth) +func resolveHost(base string) string { parsed, errParse := url.Parse(base) if errParse != nil { return "" @@ -535,6 +621,37 @@ func resolveUserAgent(auth *cliproxyauth.Auth) string { return defaultAntigravityAgent } +func antigravityBaseURLFallbackOrder(auth *cliproxyauth.Auth) []string { + if base := resolveCustomAntigravityBaseURL(auth); base != "" { + return []string{base} + } + return []string{ + antigravityBaseURLDaily, + antigravityBaseURLAutopush, + // antigravityBaseURLProd, + } +} + +func resolveCustomAntigravityBaseURL(auth *cliproxyauth.Auth) string { + if auth == nil { + return "" + } + if auth.Attributes != nil { + if v := strings.TrimSpace(auth.Attributes["base_url"]); v != "" { + return strings.TrimSuffix(v, "/") + } + } + if auth.Metadata != nil { + if v, ok := auth.Metadata["base_url"].(string); ok { + v = strings.TrimSpace(v) + if v != "" { + return strings.TrimSuffix(v, "/") + } + } + } + return "" +} + func geminiToAntigravity(modelName string, payload []byte) []byte { template, _ := sjson.Set(string(payload), "model", modelName) template, _ = sjson.Set(template, "userAgent", "antigravity") @@ -613,7 +730,7 @@ func modelName2Alias(modelName string) string { return "gemini-claude-sonnet-4-5" case "claude-sonnet-4-5-thinking": return "gemini-claude-sonnet-4-5-thinking" - case "chat_20706", "chat_23310", "gemini-2.5-flash-thinking", "gemini-3-pro-low": + case "chat_20706", "chat_23310", "gemini-2.5-flash-thinking", "gemini-3-pro-low", "gemini-2.5-pro": return "" default: return modelName