**chore(executor): update default agent version and simplify const formatting**

- Updated `defaultAntigravityAgent` to version `1.11.5`.
- Adjusted const value formatting for improved readability.

**feat(executor): introduce fallback mechanism for Antigravity base URLs**

- Added retry logic with fallback order for Antigravity base URLs to handle request errors and rate limits.
- Refactored base URL handling with `antigravityBaseURLFallbackOrder` and related utilities.
- Enhanced error handling in non-streaming and streaming requests with retry support and improved metadata reporting.
- Updated `buildRequest` to support dynamic base URL assignment.
This commit is contained in:
Luis Pater
2025-11-23 14:50:58 +08:00
parent ac064389ca
commit 257621c5ed

View File

@@ -26,16 +26,18 @@ import (
) )
const ( const (
antigravityBaseURL = "https://daily-cloudcode-pa.sandbox.googleapis.com" antigravityBaseURLDaily = "https://daily-cloudcode-pa.sandbox.googleapis.com"
antigravityStreamPath = "/v1internal:streamGenerateContent" antigravityBaseURLAutopush = "https://autopush-cloudcode-pa.sandbox.googleapis.com"
antigravityGeneratePath = "/v1internal:generateContent" antigravityBaseURLProd = "https://cloudcode-pa.googleapis.com"
antigravityModelsPath = "/v1internal:fetchAvailableModels" antigravityStreamPath = "/v1internal:streamGenerateContent"
antigravityClientID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com" antigravityGeneratePath = "/v1internal:generateContent"
antigravityClientSecret = "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" antigravityModelsPath = "/v1internal:fetchAvailableModels"
defaultAntigravityAgent = "antigravity/1.11.3 windows/amd64" antigravityClientID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com"
antigravityAuthType = "antigravity" antigravityClientSecret = "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"
refreshSkew = 5 * time.Minute defaultAntigravityAgent = "antigravity/1.11.5 windows/amd64"
streamScannerBuffer int = 20_971_520 antigravityAuthType = "antigravity"
refreshSkew = 3000 * time.Second
streamScannerBuffer int = 20_971_520
) )
var randSource = rand.New(rand.NewSource(time.Now().UnixNano())) var randSource = rand.New(rand.NewSource(time.Now().UnixNano()))
@@ -73,43 +75,76 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au
to := sdktranslator.FromString("antigravity") to := sdktranslator.FromString("antigravity")
translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false) translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false)
httpReq, errReq := e.buildRequest(ctx, auth, token, req.Model, translated, false, opts.Alt) baseURLs := antigravityBaseURLFallbackOrder(auth)
if errReq != nil {
return resp, errReq
}
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
httpResp, errDo := httpClient.Do(httpReq)
if errDo != nil { var lastStatus int
recordAPIResponseError(ctx, e.cfg, errDo) var lastBody []byte
return resp, errDo var lastErr error
}
defer func() { for idx, baseURL := range baseURLs {
httpReq, errReq := e.buildRequest(ctx, auth, token, req.Model, translated, false, opts.Alt, baseURL)
if errReq != nil {
err = errReq
return resp, err
}
httpResp, errDo := httpClient.Do(httpReq)
if errDo != nil {
recordAPIResponseError(ctx, e.cfg, errDo)
lastStatus = 0
lastBody = nil
lastErr = errDo
if idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
err = errDo
return resp, err
}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
bodyBytes, errRead := io.ReadAll(httpResp.Body)
if errClose := httpResp.Body.Close(); errClose != nil { if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("antigravity executor: close response body error: %v", errClose) log.Errorf("antigravity executor: close response body error: %v", errClose)
} }
}() if errRead != nil {
recordAPIResponseError(ctx, e.cfg, errRead)
err = errRead
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
bodyBytes, errRead := io.ReadAll(httpResp.Body) log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes))
if errRead != nil { lastStatus = httpResp.StatusCode
recordAPIResponseError(ctx, e.cfg, errRead) lastBody = append([]byte(nil), bodyBytes...)
return resp, errRead lastErr = nil
} if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
appendAPIResponseChunk(ctx, e.cfg, bodyBytes) log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
err = statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
return resp, err
}
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { reporter.publish(ctx, parseAntigravityUsage(bodyBytes))
log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes)) var param any
err = statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, &param)
return resp, err resp = cliproxyexecutor.Response{Payload: []byte(converted)}
reporter.ensurePublished(ctx)
return resp, nil
} }
reporter.publish(ctx, parseAntigravityUsage(bodyBytes)) switch {
var param any case lastStatus != 0:
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, &param) err = statusErr{code: lastStatus, msg: string(lastBody)}
resp = cliproxyexecutor.Response{Payload: []byte(converted)} case lastErr != nil:
reporter.ensurePublished(ctx) err = lastErr
return resp, nil default:
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
}
return resp, err
} }
// ExecuteStream handles streaming requests via the antigravity upstream. // ExecuteStream handles streaming requests via the antigravity upstream.
@@ -131,75 +166,121 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya
to := sdktranslator.FromString("antigravity") to := sdktranslator.FromString("antigravity")
translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
httpReq, errReq := e.buildRequest(ctx, auth, token, req.Model, translated, true, opts.Alt) baseURLs := antigravityBaseURLFallbackOrder(auth)
if errReq != nil {
return nil, errReq
}
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
httpResp, errDo := httpClient.Do(httpReq)
if errDo != nil {
recordAPIResponseError(ctx, e.cfg, errDo)
return nil, errDo
}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
bodyBytes, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("antigravity executor: close response body error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk) var lastStatus int
stream = out var lastBody []byte
go func() { var lastErr error
defer close(out)
defer func() { for idx, baseURL := range baseURLs {
httpReq, errReq := e.buildRequest(ctx, auth, token, req.Model, translated, true, opts.Alt, baseURL)
if errReq != nil {
err = errReq
return nil, err
}
httpResp, errDo := httpClient.Do(httpReq)
if errDo != nil {
recordAPIResponseError(ctx, e.cfg, errDo)
lastStatus = 0
lastBody = nil
lastErr = errDo
if idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
err = errDo
return nil, err
}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
bodyBytes, errRead := io.ReadAll(httpResp.Body)
if errClose := httpResp.Body.Close(); errClose != nil { if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("antigravity executor: close response body error: %v", errClose) log.Errorf("antigravity executor: close response body error: %v", errClose)
} }
}() if errRead != nil {
scanner := bufio.NewScanner(httpResp.Body) recordAPIResponseError(ctx, e.cfg, errRead)
scanner.Buffer(nil, streamScannerBuffer) lastStatus = 0
var param any lastBody = nil
for scanner.Scan() { lastErr = errRead
line := scanner.Bytes() if idx+1 < len(baseURLs) {
appendAPIResponseChunk(ctx, e.cfg, line) log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
// Filter usage metadata for all models }
// Only retain usage statistics in the terminal chunk err = errRead
line = FilterSSEUsageMetadata(line) return nil, err
}
payload := jsonPayload(line) appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
if payload == nil { lastStatus = httpResp.StatusCode
lastBody = append([]byte(nil), bodyBytes...)
lastErr = nil
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue continue
} }
err = statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
return nil, err
}
if detail, ok := parseAntigravityStreamUsage(payload); ok { out := make(chan cliproxyexecutor.StreamChunk)
reporter.publish(ctx, detail) stream = out
} go func(resp *http.Response) {
defer close(out)
defer func() {
if errClose := resp.Body.Close(); errClose != nil {
log.Errorf("antigravity executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(nil, streamScannerBuffer)
var param any
for scanner.Scan() {
line := scanner.Bytes()
appendAPIResponseChunk(ctx, e.cfg, line)
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), &param) // Filter usage metadata for all models
for i := range chunks { // Only retain usage statistics in the terminal chunk
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} line = FilterSSEUsageMetadata(line)
payload := jsonPayload(line)
if payload == nil {
continue
}
if detail, ok := parseAntigravityStreamUsage(payload); ok {
reporter.publish(ctx, detail)
}
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), &param)
for i := range chunks {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
}
} }
} tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), &param)
tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), &param) for i := range tail {
for i := range tail { out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])}
out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])} }
} if errScan := scanner.Err(); errScan != nil {
if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan)
recordAPIResponseError(ctx, e.cfg, errScan) reporter.publishFailure(ctx)
reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan}
out <- cliproxyexecutor.StreamChunk{Err: errScan} } else {
} else { reporter.ensurePublished(ctx)
reporter.ensurePublished(ctx) }
} }(httpResp)
}() return stream, nil
return stream, nil }
switch {
case lastStatus != 0:
err = statusErr{code: lastStatus, msg: string(lastBody)}
case lastErr != nil:
err = lastErr
default:
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
}
return nil, err
} }
// Refresh refreshes the OAuth token using the refresh token. // Refresh refreshes the OAuth token using the refresh token.
@@ -230,57 +311,72 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c
auth = updatedAuth auth = updatedAuth
} }
modelsURL := buildBaseURL(auth) + antigravityModelsPath baseURLs := antigravityBaseURLFallbackOrder(auth)
httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, modelsURL, bytes.NewReader([]byte(`{}`)))
if errReq != nil {
return nil
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Authorization", "Bearer "+token)
httpReq.Header.Set("User-Agent", resolveUserAgent(auth))
if host := resolveHost(auth); host != "" {
httpReq.Host = host
}
httpClient := newProxyAwareHTTPClient(ctx, cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, cfg, auth, 0)
httpResp, errDo := httpClient.Do(httpReq)
if errDo != nil { for idx, baseURL := range baseURLs {
return nil modelsURL := baseURL + antigravityModelsPath
} httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, modelsURL, bytes.NewReader([]byte(`{}`)))
defer func() { if errReq != nil {
return nil
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Authorization", "Bearer "+token)
httpReq.Header.Set("User-Agent", resolveUserAgent(auth))
if host := resolveHost(baseURL); host != "" {
httpReq.Host = host
}
httpResp, errDo := httpClient.Do(httpReq)
if errDo != nil {
if idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: models request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
return nil
}
bodyBytes, errRead := io.ReadAll(httpResp.Body)
if errClose := httpResp.Body.Close(); errClose != nil { if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("antigravity executor: close response body error: %v", errClose) log.Errorf("antigravity executor: close response body error: %v", errClose)
} }
}() if errRead != nil {
if idx+1 < len(baseURLs) {
bodyBytes, errRead := io.ReadAll(httpResp.Body) log.Debugf("antigravity executor: models read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
if errRead != nil { continue
return nil }
} return nil
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
return nil
}
result := gjson.GetBytes(bodyBytes, "models")
if !result.Exists() {
return nil
}
now := time.Now().Unix()
models := make([]*registry.ModelInfo, 0, len(result.Map()))
for id := range result.Map() {
id = modelName2Alias(id)
if id != "" {
models = append(models, &registry.ModelInfo{
ID: id,
Object: "model",
Created: now,
OwnedBy: antigravityAuthType,
Type: antigravityAuthType,
})
} }
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: models request rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
return nil
}
result := gjson.GetBytes(bodyBytes, "models")
if !result.Exists() {
return nil
}
now := time.Now().Unix()
models := make([]*registry.ModelInfo, 0, len(result.Map()))
for id := range result.Map() {
id = modelName2Alias(id)
if id != "" {
models = append(models, &registry.ModelInfo{
ID: id,
Object: "model",
Created: now,
OwnedBy: antigravityAuthType,
Type: antigravityAuthType,
})
}
}
return models
} }
return models return nil
} }
func (e *AntigravityExecutor) ensureAccessToken(ctx context.Context, auth *cliproxyauth.Auth) (string, *cliproxyauth.Auth, error) { func (e *AntigravityExecutor) ensureAccessToken(ctx context.Context, auth *cliproxyauth.Auth) (string, *cliproxyauth.Auth, error) {
@@ -366,12 +462,15 @@ func (e *AntigravityExecutor) refreshToken(ctx context.Context, auth *cliproxyau
return auth, nil return auth, nil
} }
func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyauth.Auth, token, modelName string, payload []byte, stream bool, alt string) (*http.Request, error) { func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyauth.Auth, token, modelName string, payload []byte, stream bool, alt, baseURL string) (*http.Request, error) {
if token == "" { if token == "" {
return nil, statusErr{code: http.StatusUnauthorized, msg: "missing access token"} return nil, statusErr{code: http.StatusUnauthorized, msg: "missing access token"}
} }
base := buildBaseURL(auth) base := strings.TrimSuffix(baseURL, "/")
if base == "" {
base = buildBaseURL(auth)
}
path := antigravityGeneratePath path := antigravityGeneratePath
if stream { if stream {
path = antigravityStreamPath path = antigravityStreamPath
@@ -405,7 +504,7 @@ func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyau
} else { } else {
httpReq.Header.Set("Accept", "application/json") httpReq.Header.Set("Accept", "application/json")
} }
if host := resolveHost(auth); host != "" { if host := resolveHost(base); host != "" {
httpReq.Host = host httpReq.Host = host
} }
@@ -489,26 +588,13 @@ func int64Value(value any) (int64, bool) {
} }
func buildBaseURL(auth *cliproxyauth.Auth) string { func buildBaseURL(auth *cliproxyauth.Auth) string {
if auth != nil { if baseURLs := antigravityBaseURLFallbackOrder(auth); len(baseURLs) > 0 {
if auth.Attributes != nil { return baseURLs[0]
if v := strings.TrimSpace(auth.Attributes["base_url"]); v != "" {
return strings.TrimSuffix(v, "/")
}
}
if auth.Metadata != nil {
if v, ok := auth.Metadata["base_url"].(string); ok {
v = strings.TrimSpace(v)
if v != "" {
return strings.TrimSuffix(v, "/")
}
}
}
} }
return antigravityBaseURL return antigravityBaseURLAutopush
} }
func resolveHost(auth *cliproxyauth.Auth) string { func resolveHost(base string) string {
base := buildBaseURL(auth)
parsed, errParse := url.Parse(base) parsed, errParse := url.Parse(base)
if errParse != nil { if errParse != nil {
return "" return ""
@@ -535,6 +621,37 @@ func resolveUserAgent(auth *cliproxyauth.Auth) string {
return defaultAntigravityAgent return defaultAntigravityAgent
} }
func antigravityBaseURLFallbackOrder(auth *cliproxyauth.Auth) []string {
if base := resolveCustomAntigravityBaseURL(auth); base != "" {
return []string{base}
}
return []string{
antigravityBaseURLDaily,
antigravityBaseURLAutopush,
// antigravityBaseURLProd,
}
}
func resolveCustomAntigravityBaseURL(auth *cliproxyauth.Auth) string {
if auth == nil {
return ""
}
if auth.Attributes != nil {
if v := strings.TrimSpace(auth.Attributes["base_url"]); v != "" {
return strings.TrimSuffix(v, "/")
}
}
if auth.Metadata != nil {
if v, ok := auth.Metadata["base_url"].(string); ok {
v = strings.TrimSpace(v)
if v != "" {
return strings.TrimSuffix(v, "/")
}
}
}
return ""
}
func geminiToAntigravity(modelName string, payload []byte) []byte { func geminiToAntigravity(modelName string, payload []byte) []byte {
template, _ := sjson.Set(string(payload), "model", modelName) template, _ := sjson.Set(string(payload), "model", modelName)
template, _ = sjson.Set(template, "userAgent", "antigravity") template, _ = sjson.Set(template, "userAgent", "antigravity")
@@ -613,7 +730,7 @@ func modelName2Alias(modelName string) string {
return "gemini-claude-sonnet-4-5" return "gemini-claude-sonnet-4-5"
case "claude-sonnet-4-5-thinking": case "claude-sonnet-4-5-thinking":
return "gemini-claude-sonnet-4-5-thinking" return "gemini-claude-sonnet-4-5-thinking"
case "chat_20706", "chat_23310", "gemini-2.5-flash-thinking", "gemini-3-pro-low": case "chat_20706", "chat_23310", "gemini-2.5-flash-thinking", "gemini-3-pro-low", "gemini-2.5-pro":
return "" return ""
default: default:
return modelName return modelName