mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-02-03 04:50:52 +08:00
refactor(runtime): implement retry logic for Antigravity executor with improved error handling and capacity management
This commit is contained in:
@@ -148,87 +148,108 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au
|
|||||||
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
|
|
||||||
var lastStatus int
|
attempts := antigravityRetryAttempts(e.cfg)
|
||||||
var lastBody []byte
|
|
||||||
var lastErr error
|
|
||||||
|
|
||||||
for idx, baseURL := range baseURLs {
|
attemptLoop:
|
||||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, false, opts.Alt, baseURL)
|
for attempt := 0; attempt < attempts; attempt++ {
|
||||||
if errReq != nil {
|
var lastStatus int
|
||||||
err = errReq
|
var lastBody []byte
|
||||||
return resp, err
|
var lastErr error
|
||||||
}
|
|
||||||
|
|
||||||
httpResp, errDo := httpClient.Do(httpReq)
|
for idx, baseURL := range baseURLs {
|
||||||
if errDo != nil {
|
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, false, opts.Alt, baseURL)
|
||||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
if errReq != nil {
|
||||||
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
err = errReq
|
||||||
return resp, errDo
|
return resp, err
|
||||||
}
|
}
|
||||||
lastStatus = 0
|
|
||||||
lastBody = nil
|
httpResp, errDo := httpClient.Do(httpReq)
|
||||||
lastErr = errDo
|
if errDo != nil {
|
||||||
if idx+1 < len(baseURLs) {
|
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||||
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
||||||
continue
|
return resp, errDo
|
||||||
|
}
|
||||||
|
lastStatus = 0
|
||||||
|
lastBody = nil
|
||||||
|
lastErr = errDo
|
||||||
|
if idx+1 < len(baseURLs) {
|
||||||
|
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err = errDo
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
err = errDo
|
|
||||||
return resp, err
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
|
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
||||||
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
if errRead != nil {
|
||||||
|
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||||
|
err = errRead
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
||||||
|
|
||||||
|
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
||||||
|
log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes))
|
||||||
|
lastStatus = httpResp.StatusCode
|
||||||
|
lastBody = append([]byte(nil), bodyBytes...)
|
||||||
|
lastErr = nil
|
||||||
|
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
||||||
|
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) {
|
||||||
|
if idx+1 < len(baseURLs) {
|
||||||
|
log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if attempt+1 < attempts {
|
||||||
|
delay := antigravityNoCapacityRetryDelay(attempt)
|
||||||
|
log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts)
|
||||||
|
if errWait := antigravityWait(ctx, delay); errWait != nil {
|
||||||
|
return resp, errWait
|
||||||
|
}
|
||||||
|
continue attemptLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
||||||
|
if httpResp.StatusCode == http.StatusTooManyRequests {
|
||||||
|
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
||||||
|
sErr.retryAfter = retryAfter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = sErr
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
reporter.publish(ctx, parseAntigravityUsage(bodyBytes))
|
||||||
|
var param any
|
||||||
|
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, ¶m)
|
||||||
|
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
|
||||||
|
reporter.ensurePublished(ctx)
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
switch {
|
||||||
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
case lastStatus != 0:
|
||||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
||||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
if lastStatus == http.StatusTooManyRequests {
|
||||||
}
|
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
||||||
if errRead != nil {
|
|
||||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
|
||||||
err = errRead
|
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
|
||||||
|
|
||||||
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
|
||||||
log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes))
|
|
||||||
lastStatus = httpResp.StatusCode
|
|
||||||
lastBody = append([]byte(nil), bodyBytes...)
|
|
||||||
lastErr = nil
|
|
||||||
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
|
||||||
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
|
||||||
if httpResp.StatusCode == http.StatusTooManyRequests {
|
|
||||||
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
|
||||||
sErr.retryAfter = retryAfter
|
sErr.retryAfter = retryAfter
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = sErr
|
err = sErr
|
||||||
return resp, err
|
case lastErr != nil:
|
||||||
|
err = lastErr
|
||||||
|
default:
|
||||||
|
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
||||||
}
|
}
|
||||||
|
return resp, err
|
||||||
reporter.publish(ctx, parseAntigravityUsage(bodyBytes))
|
|
||||||
var param any
|
|
||||||
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, ¶m)
|
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
|
|
||||||
reporter.ensurePublished(ctx)
|
|
||||||
return resp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
|
||||||
case lastStatus != 0:
|
|
||||||
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
|
||||||
if lastStatus == http.StatusTooManyRequests {
|
|
||||||
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
|
||||||
sErr.retryAfter = retryAfter
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err = sErr
|
|
||||||
case lastErr != nil:
|
|
||||||
err = lastErr
|
|
||||||
default:
|
|
||||||
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
|
||||||
}
|
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -268,150 +289,171 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth *
|
|||||||
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
|
|
||||||
var lastStatus int
|
attempts := antigravityRetryAttempts(e.cfg)
|
||||||
var lastBody []byte
|
|
||||||
var lastErr error
|
|
||||||
|
|
||||||
for idx, baseURL := range baseURLs {
|
attemptLoop:
|
||||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL)
|
for attempt := 0; attempt < attempts; attempt++ {
|
||||||
if errReq != nil {
|
var lastStatus int
|
||||||
err = errReq
|
var lastBody []byte
|
||||||
return resp, err
|
var lastErr error
|
||||||
}
|
|
||||||
|
|
||||||
httpResp, errDo := httpClient.Do(httpReq)
|
for idx, baseURL := range baseURLs {
|
||||||
if errDo != nil {
|
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL)
|
||||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
if errReq != nil {
|
||||||
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
err = errReq
|
||||||
return resp, errDo
|
return resp, err
|
||||||
}
|
}
|
||||||
lastStatus = 0
|
|
||||||
lastBody = nil
|
httpResp, errDo := httpClient.Do(httpReq)
|
||||||
lastErr = errDo
|
if errDo != nil {
|
||||||
if idx+1 < len(baseURLs) {
|
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||||
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
||||||
continue
|
return resp, errDo
|
||||||
}
|
|
||||||
err = errDo
|
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
|
||||||
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
|
||||||
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
|
||||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
|
||||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
|
||||||
}
|
|
||||||
if errRead != nil {
|
|
||||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
|
||||||
if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) {
|
|
||||||
err = errRead
|
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
if errCtx := ctx.Err(); errCtx != nil {
|
|
||||||
err = errCtx
|
|
||||||
return resp, err
|
|
||||||
}
|
}
|
||||||
lastStatus = 0
|
lastStatus = 0
|
||||||
lastBody = nil
|
lastBody = nil
|
||||||
lastErr = errRead
|
lastErr = errDo
|
||||||
if idx+1 < len(baseURLs) {
|
if idx+1 < len(baseURLs) {
|
||||||
log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = errRead
|
err = errDo
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
lastStatus = httpResp.StatusCode
|
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
||||||
lastBody = append([]byte(nil), bodyBytes...)
|
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
||||||
lastErr = nil
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||||
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
}
|
||||||
continue
|
if errRead != nil {
|
||||||
|
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||||
|
if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) {
|
||||||
|
err = errRead
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
if errCtx := ctx.Err(); errCtx != nil {
|
||||||
|
err = errCtx
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
lastStatus = 0
|
||||||
|
lastBody = nil
|
||||||
|
lastErr = errRead
|
||||||
|
if idx+1 < len(baseURLs) {
|
||||||
|
log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err = errRead
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
||||||
|
lastStatus = httpResp.StatusCode
|
||||||
|
lastBody = append([]byte(nil), bodyBytes...)
|
||||||
|
lastErr = nil
|
||||||
|
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
||||||
|
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) {
|
||||||
|
if idx+1 < len(baseURLs) {
|
||||||
|
log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if attempt+1 < attempts {
|
||||||
|
delay := antigravityNoCapacityRetryDelay(attempt)
|
||||||
|
log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts)
|
||||||
|
if errWait := antigravityWait(ctx, delay); errWait != nil {
|
||||||
|
return resp, errWait
|
||||||
|
}
|
||||||
|
continue attemptLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
||||||
|
if httpResp.StatusCode == http.StatusTooManyRequests {
|
||||||
|
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
||||||
|
sErr.retryAfter = retryAfter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = sErr
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
|
||||||
if httpResp.StatusCode == http.StatusTooManyRequests {
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
go func(resp *http.Response) {
|
||||||
|
defer close(out)
|
||||||
|
defer func() {
|
||||||
|
if errClose := resp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
scanner := bufio.NewScanner(resp.Body)
|
||||||
|
scanner.Buffer(nil, streamScannerBuffer)
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Bytes()
|
||||||
|
appendAPIResponseChunk(ctx, e.cfg, line)
|
||||||
|
|
||||||
|
// Filter usage metadata for all models
|
||||||
|
// Only retain usage statistics in the terminal chunk
|
||||||
|
line = FilterSSEUsageMetadata(line)
|
||||||
|
|
||||||
|
payload := jsonPayload(line)
|
||||||
|
if payload == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if detail, ok := parseAntigravityStreamUsage(payload); ok {
|
||||||
|
reporter.publish(ctx, detail)
|
||||||
|
}
|
||||||
|
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Payload: payload}
|
||||||
|
}
|
||||||
|
if errScan := scanner.Err(); errScan != nil {
|
||||||
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||||
|
reporter.publishFailure(ctx)
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
|
} else {
|
||||||
|
reporter.ensurePublished(ctx)
|
||||||
|
}
|
||||||
|
}(httpResp)
|
||||||
|
|
||||||
|
var buffer bytes.Buffer
|
||||||
|
for chunk := range out {
|
||||||
|
if chunk.Err != nil {
|
||||||
|
return resp, chunk.Err
|
||||||
|
}
|
||||||
|
if len(chunk.Payload) > 0 {
|
||||||
|
_, _ = buffer.Write(chunk.Payload)
|
||||||
|
_, _ = buffer.Write([]byte("\n"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp = cliproxyexecutor.Response{Payload: e.convertStreamToNonStream(buffer.Bytes())}
|
||||||
|
|
||||||
|
reporter.publish(ctx, parseAntigravityUsage(resp.Payload))
|
||||||
|
var param any
|
||||||
|
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, resp.Payload, ¶m)
|
||||||
|
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
|
||||||
|
reporter.ensurePublished(ctx)
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case lastStatus != 0:
|
||||||
|
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
||||||
|
if lastStatus == http.StatusTooManyRequests {
|
||||||
|
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
||||||
sErr.retryAfter = retryAfter
|
sErr.retryAfter = retryAfter
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = sErr
|
err = sErr
|
||||||
return resp, err
|
case lastErr != nil:
|
||||||
|
err = lastErr
|
||||||
|
default:
|
||||||
|
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
||||||
}
|
}
|
||||||
|
return resp, err
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
|
||||||
go func(resp *http.Response) {
|
|
||||||
defer close(out)
|
|
||||||
defer func() {
|
|
||||||
if errClose := resp.Body.Close(); errClose != nil {
|
|
||||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
|
||||||
scanner.Buffer(nil, streamScannerBuffer)
|
|
||||||
for scanner.Scan() {
|
|
||||||
line := scanner.Bytes()
|
|
||||||
appendAPIResponseChunk(ctx, e.cfg, line)
|
|
||||||
|
|
||||||
// Filter usage metadata for all models
|
|
||||||
// Only retain usage statistics in the terminal chunk
|
|
||||||
line = FilterSSEUsageMetadata(line)
|
|
||||||
|
|
||||||
payload := jsonPayload(line)
|
|
||||||
if payload == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if detail, ok := parseAntigravityStreamUsage(payload); ok {
|
|
||||||
reporter.publish(ctx, detail)
|
|
||||||
}
|
|
||||||
|
|
||||||
out <- cliproxyexecutor.StreamChunk{Payload: payload}
|
|
||||||
}
|
|
||||||
if errScan := scanner.Err(); errScan != nil {
|
|
||||||
recordAPIResponseError(ctx, e.cfg, errScan)
|
|
||||||
reporter.publishFailure(ctx)
|
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
|
||||||
} else {
|
|
||||||
reporter.ensurePublished(ctx)
|
|
||||||
}
|
|
||||||
}(httpResp)
|
|
||||||
|
|
||||||
var buffer bytes.Buffer
|
|
||||||
for chunk := range out {
|
|
||||||
if chunk.Err != nil {
|
|
||||||
return resp, chunk.Err
|
|
||||||
}
|
|
||||||
if len(chunk.Payload) > 0 {
|
|
||||||
_, _ = buffer.Write(chunk.Payload)
|
|
||||||
_, _ = buffer.Write([]byte("\n"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
resp = cliproxyexecutor.Response{Payload: e.convertStreamToNonStream(buffer.Bytes())}
|
|
||||||
|
|
||||||
reporter.publish(ctx, parseAntigravityUsage(resp.Payload))
|
|
||||||
var param any
|
|
||||||
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, resp.Payload, ¶m)
|
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
|
|
||||||
reporter.ensurePublished(ctx)
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
|
||||||
case lastStatus != 0:
|
|
||||||
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
|
||||||
if lastStatus == http.StatusTooManyRequests {
|
|
||||||
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
|
||||||
sErr.retryAfter = retryAfter
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err = sErr
|
|
||||||
case lastErr != nil:
|
|
||||||
err = lastErr
|
|
||||||
default:
|
|
||||||
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
|
||||||
}
|
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -635,139 +677,160 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya
|
|||||||
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
|
|
||||||
var lastStatus int
|
attempts := antigravityRetryAttempts(e.cfg)
|
||||||
var lastBody []byte
|
|
||||||
var lastErr error
|
|
||||||
|
|
||||||
for idx, baseURL := range baseURLs {
|
attemptLoop:
|
||||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL)
|
for attempt := 0; attempt < attempts; attempt++ {
|
||||||
if errReq != nil {
|
var lastStatus int
|
||||||
err = errReq
|
var lastBody []byte
|
||||||
return nil, err
|
var lastErr error
|
||||||
}
|
|
||||||
httpResp, errDo := httpClient.Do(httpReq)
|
for idx, baseURL := range baseURLs {
|
||||||
if errDo != nil {
|
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL)
|
||||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
if errReq != nil {
|
||||||
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
err = errReq
|
||||||
return nil, errDo
|
return nil, err
|
||||||
}
|
}
|
||||||
lastStatus = 0
|
httpResp, errDo := httpClient.Do(httpReq)
|
||||||
lastBody = nil
|
if errDo != nil {
|
||||||
lastErr = errDo
|
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||||
if idx+1 < len(baseURLs) {
|
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
||||||
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
return nil, errDo
|
||||||
continue
|
|
||||||
}
|
|
||||||
err = errDo
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
|
||||||
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
|
||||||
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
|
||||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
|
||||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
|
||||||
}
|
|
||||||
if errRead != nil {
|
|
||||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
|
||||||
if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) {
|
|
||||||
err = errRead
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if errCtx := ctx.Err(); errCtx != nil {
|
|
||||||
err = errCtx
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
lastStatus = 0
|
lastStatus = 0
|
||||||
lastBody = nil
|
lastBody = nil
|
||||||
lastErr = errRead
|
lastErr = errDo
|
||||||
if idx+1 < len(baseURLs) {
|
if idx+1 < len(baseURLs) {
|
||||||
log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = errRead
|
err = errDo
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
lastStatus = httpResp.StatusCode
|
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
||||||
lastBody = append([]byte(nil), bodyBytes...)
|
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
||||||
lastErr = nil
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||||
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
}
|
||||||
continue
|
if errRead != nil {
|
||||||
|
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||||
|
if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) {
|
||||||
|
err = errRead
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if errCtx := ctx.Err(); errCtx != nil {
|
||||||
|
err = errCtx
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
lastStatus = 0
|
||||||
|
lastBody = nil
|
||||||
|
lastErr = errRead
|
||||||
|
if idx+1 < len(baseURLs) {
|
||||||
|
log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err = errRead
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
||||||
|
lastStatus = httpResp.StatusCode
|
||||||
|
lastBody = append([]byte(nil), bodyBytes...)
|
||||||
|
lastErr = nil
|
||||||
|
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
||||||
|
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) {
|
||||||
|
if idx+1 < len(baseURLs) {
|
||||||
|
log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if attempt+1 < attempts {
|
||||||
|
delay := antigravityNoCapacityRetryDelay(attempt)
|
||||||
|
log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts)
|
||||||
|
if errWait := antigravityWait(ctx, delay); errWait != nil {
|
||||||
|
return nil, errWait
|
||||||
|
}
|
||||||
|
continue attemptLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
||||||
|
if httpResp.StatusCode == http.StatusTooManyRequests {
|
||||||
|
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
||||||
|
sErr.retryAfter = retryAfter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = sErr
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
|
||||||
if httpResp.StatusCode == http.StatusTooManyRequests {
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
stream = out
|
||||||
|
go func(resp *http.Response) {
|
||||||
|
defer close(out)
|
||||||
|
defer func() {
|
||||||
|
if errClose := resp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
scanner := bufio.NewScanner(resp.Body)
|
||||||
|
scanner.Buffer(nil, streamScannerBuffer)
|
||||||
|
var param any
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Bytes()
|
||||||
|
appendAPIResponseChunk(ctx, e.cfg, line)
|
||||||
|
|
||||||
|
// Filter usage metadata for all models
|
||||||
|
// Only retain usage statistics in the terminal chunk
|
||||||
|
line = FilterSSEUsageMetadata(line)
|
||||||
|
|
||||||
|
payload := jsonPayload(line)
|
||||||
|
if payload == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if detail, ok := parseAntigravityStreamUsage(payload); ok {
|
||||||
|
reporter.publish(ctx, detail)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), ¶m)
|
||||||
|
for i := range chunks {
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), ¶m)
|
||||||
|
for i := range tail {
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])}
|
||||||
|
}
|
||||||
|
if errScan := scanner.Err(); errScan != nil {
|
||||||
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||||
|
reporter.publishFailure(ctx)
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
|
} else {
|
||||||
|
reporter.ensurePublished(ctx)
|
||||||
|
}
|
||||||
|
}(httpResp)
|
||||||
|
return stream, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case lastStatus != 0:
|
||||||
|
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
||||||
|
if lastStatus == http.StatusTooManyRequests {
|
||||||
|
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
||||||
sErr.retryAfter = retryAfter
|
sErr.retryAfter = retryAfter
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = sErr
|
err = sErr
|
||||||
return nil, err
|
case lastErr != nil:
|
||||||
|
err = lastErr
|
||||||
|
default:
|
||||||
|
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
||||||
}
|
}
|
||||||
|
return nil, err
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
|
||||||
stream = out
|
|
||||||
go func(resp *http.Response) {
|
|
||||||
defer close(out)
|
|
||||||
defer func() {
|
|
||||||
if errClose := resp.Body.Close(); errClose != nil {
|
|
||||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
|
||||||
scanner.Buffer(nil, streamScannerBuffer)
|
|
||||||
var param any
|
|
||||||
for scanner.Scan() {
|
|
||||||
line := scanner.Bytes()
|
|
||||||
appendAPIResponseChunk(ctx, e.cfg, line)
|
|
||||||
|
|
||||||
// Filter usage metadata for all models
|
|
||||||
// Only retain usage statistics in the terminal chunk
|
|
||||||
line = FilterSSEUsageMetadata(line)
|
|
||||||
|
|
||||||
payload := jsonPayload(line)
|
|
||||||
if payload == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if detail, ok := parseAntigravityStreamUsage(payload); ok {
|
|
||||||
reporter.publish(ctx, detail)
|
|
||||||
}
|
|
||||||
|
|
||||||
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), ¶m)
|
|
||||||
for i := range chunks {
|
|
||||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), ¶m)
|
|
||||||
for i := range tail {
|
|
||||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])}
|
|
||||||
}
|
|
||||||
if errScan := scanner.Err(); errScan != nil {
|
|
||||||
recordAPIResponseError(ctx, e.cfg, errScan)
|
|
||||||
reporter.publishFailure(ctx)
|
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
|
||||||
} else {
|
|
||||||
reporter.ensurePublished(ctx)
|
|
||||||
}
|
|
||||||
}(httpResp)
|
|
||||||
return stream, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
|
||||||
case lastStatus != 0:
|
|
||||||
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
|
||||||
if lastStatus == http.StatusTooManyRequests {
|
|
||||||
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
|
||||||
sErr.retryAfter = retryAfter
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err = sErr
|
|
||||||
case lastErr != nil:
|
|
||||||
err = lastErr
|
|
||||||
default:
|
|
||||||
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
|
||||||
}
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1384,14 +1447,65 @@ func resolveUserAgent(auth *cliproxyauth.Auth) string {
|
|||||||
return defaultAntigravityAgent
|
return defaultAntigravityAgent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func antigravityRetryAttempts(cfg *config.Config) int {
|
||||||
|
if cfg == nil {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
retry := cfg.RequestRetry
|
||||||
|
if retry < 0 {
|
||||||
|
retry = 0
|
||||||
|
}
|
||||||
|
attempts := retry + 1
|
||||||
|
if attempts < 1 {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
return attempts
|
||||||
|
}
|
||||||
|
|
||||||
|
func antigravityShouldRetryNoCapacity(statusCode int, body []byte) bool {
|
||||||
|
if statusCode != http.StatusServiceUnavailable {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if len(body) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
msg := strings.ToLower(string(body))
|
||||||
|
return strings.Contains(msg, "no capacity available")
|
||||||
|
}
|
||||||
|
|
||||||
|
func antigravityNoCapacityRetryDelay(attempt int) time.Duration {
|
||||||
|
if attempt < 0 {
|
||||||
|
attempt = 0
|
||||||
|
}
|
||||||
|
delay := time.Duration(attempt+1) * 250 * time.Millisecond
|
||||||
|
if delay > 2*time.Second {
|
||||||
|
delay = 2 * time.Second
|
||||||
|
}
|
||||||
|
return delay
|
||||||
|
}
|
||||||
|
|
||||||
|
func antigravityWait(ctx context.Context, wait time.Duration) error {
|
||||||
|
if wait <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
timer := time.NewTimer(wait)
|
||||||
|
defer timer.Stop()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-timer.C:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func antigravityBaseURLFallbackOrder(auth *cliproxyauth.Auth) []string {
|
func antigravityBaseURLFallbackOrder(auth *cliproxyauth.Auth) []string {
|
||||||
if base := resolveCustomAntigravityBaseURL(auth); base != "" {
|
if base := resolveCustomAntigravityBaseURL(auth); base != "" {
|
||||||
return []string{base}
|
return []string{base}
|
||||||
}
|
}
|
||||||
return []string{
|
return []string{
|
||||||
antigravitySandboxBaseURLDaily,
|
|
||||||
antigravityBaseURLDaily,
|
antigravityBaseURLDaily,
|
||||||
antigravityBaseURLProd,
|
antigravitySandboxBaseURLDaily,
|
||||||
|
// antigravityBaseURLProd,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user