feat(antigravity): implement non-streaming execution for Claude model requests

This commit is contained in:
Luis Pater
2025-12-17 23:17:11 +08:00
parent 09923f654c
commit 5fda6f8ef3

View File

@@ -69,6 +69,10 @@ func (e *AntigravityExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Au
// Execute performs a non-streaming request to the Antigravity API.
func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
if strings.Contains(req.Model, "claude") {
return e.executeClaudeNonStream(ctx, auth, req, opts)
}
token, updatedAuth, errToken := e.ensureAccessToken(ctx, auth)
if errToken != nil {
return resp, errToken
@@ -77,25 +81,6 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au
auth = updatedAuth
}
if strings.Contains(req.Model, "claude") {
stream, errExecuteStream := e.ExecuteStream(ctx, auth, req, opts)
if errExecuteStream != nil {
return resp, errExecuteStream
}
var buffer bytes.Buffer
for chunk := range stream {
if chunk.Err != nil {
return resp, chunk.Err
}
if len(chunk.Payload) > 0 {
_, _ = buffer.Write(chunk.Payload)
}
}
resp = cliproxyexecutor.Response{Payload: buffer.Bytes()}
return resp, nil
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
@@ -179,6 +164,336 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au
return resp, err
}
// executeClaudeNonStream performs a claude non-streaming request to the Antigravity API.
func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
token, updatedAuth, errToken := e.ensureAccessToken(ctx, auth)
if errToken != nil {
return resp, errToken
}
if updatedAuth != nil {
auth = updatedAuth
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("antigravity")
translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
translated = applyThinkingMetadataCLI(translated, req.Metadata, req.Model)
translated = util.ApplyDefaultThinkingIfNeededCLI(req.Model, translated)
translated = normalizeAntigravityThinking(req.Model, translated)
baseURLs := antigravityBaseURLFallbackOrder(auth)
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
var lastStatus int
var lastBody []byte
var lastErr error
for idx, baseURL := range baseURLs {
httpReq, errReq := e.buildRequest(ctx, auth, token, req.Model, translated, true, opts.Alt, baseURL)
if errReq != nil {
err = errReq
return resp, err
}
httpResp, errDo := httpClient.Do(httpReq)
if errDo != nil {
recordAPIResponseError(ctx, e.cfg, errDo)
lastStatus = 0
lastBody = nil
lastErr = errDo
if idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
err = errDo
return resp, err
}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
bodyBytes, errRead := io.ReadAll(httpResp.Body)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("antigravity executor: close response body error: %v", errClose)
}
if errRead != nil {
recordAPIResponseError(ctx, e.cfg, errRead)
lastStatus = 0
lastBody = nil
lastErr = errRead
if idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
err = errRead
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
lastStatus = httpResp.StatusCode
lastBody = append([]byte(nil), bodyBytes...)
lastErr = nil
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
err = statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
return resp, err
}
out := make(chan cliproxyexecutor.StreamChunk)
go func(resp *http.Response) {
defer close(out)
defer func() {
if errClose := resp.Body.Close(); errClose != nil {
log.Errorf("antigravity executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(nil, streamScannerBuffer)
for scanner.Scan() {
line := scanner.Bytes()
appendAPIResponseChunk(ctx, e.cfg, line)
// Filter usage metadata for all models
// Only retain usage statistics in the terminal chunk
line = FilterSSEUsageMetadata(line)
payload := jsonPayload(line)
if payload == nil {
continue
}
if detail, ok := parseAntigravityStreamUsage(payload); ok {
reporter.publish(ctx, detail)
}
out <- cliproxyexecutor.StreamChunk{Payload: payload}
}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} else {
reporter.ensurePublished(ctx)
}
}(httpResp)
var buffer bytes.Buffer
for chunk := range out {
if chunk.Err != nil {
return resp, chunk.Err
}
if len(chunk.Payload) > 0 {
_, _ = buffer.Write(chunk.Payload)
_, _ = buffer.Write([]byte("\n"))
}
}
resp = cliproxyexecutor.Response{Payload: e.convertStreamToNonStream(buffer.Bytes())}
reporter.publish(ctx, parseAntigravityUsage(resp.Payload))
var param any
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, resp.Payload, &param)
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
reporter.ensurePublished(ctx)
return resp, nil
}
switch {
case lastStatus != 0:
err = statusErr{code: lastStatus, msg: string(lastBody)}
case lastErr != nil:
err = lastErr
default:
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
}
return resp, err
}
func (e *AntigravityExecutor) convertStreamToNonStream(stream []byte) []byte {
responseTemplate := ""
var traceID string
var finishReason string
var modelVersion string
var responseID string
var role string
var usageRaw string
parts := make([]map[string]interface{}, 0)
var pendingKind string
var pendingText strings.Builder
var pendingThoughtSig string
flushPending := func() {
if pendingKind == "" {
return
}
text := pendingText.String()
switch pendingKind {
case "text":
if strings.TrimSpace(text) == "" {
pendingKind = ""
pendingText.Reset()
pendingThoughtSig = ""
return
}
parts = append(parts, map[string]interface{}{"text": text})
case "thought":
if strings.TrimSpace(text) == "" && pendingThoughtSig == "" {
pendingKind = ""
pendingText.Reset()
pendingThoughtSig = ""
return
}
part := map[string]interface{}{"thought": true}
part["text"] = text
if pendingThoughtSig != "" {
part["thoughtSignature"] = pendingThoughtSig
}
parts = append(parts, part)
}
pendingKind = ""
pendingText.Reset()
pendingThoughtSig = ""
}
normalizePart := func(partResult gjson.Result) map[string]interface{} {
var m map[string]interface{}
_ = json.Unmarshal([]byte(partResult.Raw), &m)
if m == nil {
m = map[string]interface{}{}
}
sig := partResult.Get("thoughtSignature").String()
if sig == "" {
sig = partResult.Get("thought_signature").String()
}
if sig != "" {
m["thoughtSignature"] = sig
delete(m, "thought_signature")
}
if inlineData, ok := m["inline_data"]; ok {
m["inlineData"] = inlineData
delete(m, "inline_data")
}
return m
}
for _, line := range bytes.Split(stream, []byte("\n")) {
trimmed := bytes.TrimSpace(line)
if len(trimmed) == 0 || !gjson.ValidBytes(trimmed) {
continue
}
root := gjson.ParseBytes(trimmed)
responseNode := root.Get("response")
if !responseNode.Exists() {
if root.Get("candidates").Exists() {
responseNode = root
} else {
continue
}
}
responseTemplate = responseNode.Raw
if traceResult := root.Get("traceId"); traceResult.Exists() && traceResult.String() != "" {
traceID = traceResult.String()
}
if roleResult := responseNode.Get("candidates.0.content.role"); roleResult.Exists() {
role = roleResult.String()
}
if finishResult := responseNode.Get("candidates.0.finishReason"); finishResult.Exists() && finishResult.String() != "" {
finishReason = finishResult.String()
}
if modelResult := responseNode.Get("modelVersion"); modelResult.Exists() && modelResult.String() != "" {
modelVersion = modelResult.String()
}
if responseIDResult := responseNode.Get("responseId"); responseIDResult.Exists() && responseIDResult.String() != "" {
responseID = responseIDResult.String()
}
if usageResult := responseNode.Get("usageMetadata"); usageResult.Exists() {
usageRaw = usageResult.Raw
} else if usageResult := root.Get("usageMetadata"); usageResult.Exists() {
usageRaw = usageResult.Raw
}
if partsResult := responseNode.Get("candidates.0.content.parts"); partsResult.IsArray() {
for _, part := range partsResult.Array() {
hasFunctionCall := part.Get("functionCall").Exists()
hasInlineData := part.Get("inlineData").Exists() || part.Get("inline_data").Exists()
sig := part.Get("thoughtSignature").String()
if sig == "" {
sig = part.Get("thought_signature").String()
}
text := part.Get("text").String()
thought := part.Get("thought").Bool()
if hasFunctionCall || hasInlineData {
flushPending()
parts = append(parts, normalizePart(part))
continue
}
if thought || part.Get("text").Exists() {
kind := "text"
if thought {
kind = "thought"
}
if pendingKind != "" && pendingKind != kind {
flushPending()
}
pendingKind = kind
pendingText.WriteString(text)
if kind == "thought" && sig != "" {
pendingThoughtSig = sig
}
continue
}
flushPending()
parts = append(parts, normalizePart(part))
}
}
}
flushPending()
if responseTemplate == "" {
responseTemplate = `{"candidates":[{"content":{"role":"model","parts":[]}}]}`
}
partsJSON, _ := json.Marshal(parts)
responseTemplate, _ = sjson.SetRaw(responseTemplate, "candidates.0.content.parts", string(partsJSON))
if role != "" {
responseTemplate, _ = sjson.Set(responseTemplate, "candidates.0.content.role", role)
}
if finishReason != "" {
responseTemplate, _ = sjson.Set(responseTemplate, "candidates.0.finishReason", finishReason)
}
if modelVersion != "" {
responseTemplate, _ = sjson.Set(responseTemplate, "modelVersion", modelVersion)
}
if responseID != "" {
responseTemplate, _ = sjson.Set(responseTemplate, "responseId", responseID)
}
if usageRaw != "" {
responseTemplate, _ = sjson.SetRaw(responseTemplate, "usageMetadata", usageRaw)
} else if !gjson.Get(responseTemplate, "usageMetadata").Exists() {
responseTemplate, _ = sjson.Set(responseTemplate, "usageMetadata.promptTokenCount", 0)
responseTemplate, _ = sjson.Set(responseTemplate, "usageMetadata.candidatesTokenCount", 0)
responseTemplate, _ = sjson.Set(responseTemplate, "usageMetadata.totalTokenCount", 0)
}
output := `{"response":{},"traceId":""}`
output, _ = sjson.SetRaw(output, "response", responseTemplate)
if traceID != "" {
output, _ = sjson.Set(output, "traceId", traceID)
}
return []byte(output)
}
// ExecuteStream performs a streaming request to the Antigravity API.
func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
ctx = context.WithValue(ctx, "alt", "")