From 5fda6f8ef373c897b267b7ed68761b466ae0a86d Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Wed, 17 Dec 2025 23:17:11 +0800 Subject: [PATCH] feat(antigravity): implement non-streaming execution for Claude model requests --- .../runtime/executor/antigravity_executor.go | 353 +++++++++++++++++- 1 file changed, 334 insertions(+), 19 deletions(-) diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index cd78a831..71d4841a 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -69,6 +69,10 @@ func (e *AntigravityExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Au // Execute performs a non-streaming request to the Antigravity API. func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { + if strings.Contains(req.Model, "claude") { + return e.executeClaudeNonStream(ctx, auth, req, opts) + } + token, updatedAuth, errToken := e.ensureAccessToken(ctx, auth) if errToken != nil { return resp, errToken @@ -77,25 +81,6 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au auth = updatedAuth } - if strings.Contains(req.Model, "claude") { - stream, errExecuteStream := e.ExecuteStream(ctx, auth, req, opts) - if errExecuteStream != nil { - return resp, errExecuteStream - } - - var buffer bytes.Buffer - for chunk := range stream { - if chunk.Err != nil { - return resp, chunk.Err - } - if len(chunk.Payload) > 0 { - _, _ = buffer.Write(chunk.Payload) - } - } - resp = cliproxyexecutor.Response{Payload: buffer.Bytes()} - return resp, nil - } - reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) @@ -179,6 +164,336 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au return resp, err } +// executeClaudeNonStream performs a claude non-streaming request to the Antigravity API. +func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { + token, updatedAuth, errToken := e.ensureAccessToken(ctx, auth) + if errToken != nil { + return resp, errToken + } + if updatedAuth != nil { + auth = updatedAuth + } + + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) + + from := opts.SourceFormat + to := sdktranslator.FromString("antigravity") + translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) + + translated = applyThinkingMetadataCLI(translated, req.Metadata, req.Model) + translated = util.ApplyDefaultThinkingIfNeededCLI(req.Model, translated) + translated = normalizeAntigravityThinking(req.Model, translated) + + baseURLs := antigravityBaseURLFallbackOrder(auth) + httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + + var lastStatus int + var lastBody []byte + var lastErr error + + for idx, baseURL := range baseURLs { + httpReq, errReq := e.buildRequest(ctx, auth, token, req.Model, translated, true, opts.Alt, baseURL) + if errReq != nil { + err = errReq + return resp, err + } + + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + recordAPIResponseError(ctx, e.cfg, errDo) + lastStatus = 0 + lastBody = nil + lastErr = errDo + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errDo + return resp, err + } + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + bodyBytes, errRead := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + if errRead != nil { + recordAPIResponseError(ctx, e.cfg, errRead) + lastStatus = 0 + lastBody = nil + lastErr = errRead + if idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = errRead + return resp, err + } + appendAPIResponseChunk(ctx, e.cfg, bodyBytes) + lastStatus = httpResp.StatusCode + lastBody = append([]byte(nil), bodyBytes...) + lastErr = nil + if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) { + log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) + continue + } + err = statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)} + return resp, err + } + + out := make(chan cliproxyexecutor.StreamChunk) + go func(resp *http.Response) { + defer close(out) + defer func() { + if errClose := resp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(nil, streamScannerBuffer) + for scanner.Scan() { + line := scanner.Bytes() + appendAPIResponseChunk(ctx, e.cfg, line) + + // Filter usage metadata for all models + // Only retain usage statistics in the terminal chunk + line = FilterSSEUsageMetadata(line) + + payload := jsonPayload(line) + if payload == nil { + continue + } + + if detail, ok := parseAntigravityStreamUsage(payload); ok { + reporter.publish(ctx, detail) + } + + out <- cliproxyexecutor.StreamChunk{Payload: payload} + } + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} + } else { + reporter.ensurePublished(ctx) + } + }(httpResp) + + var buffer bytes.Buffer + for chunk := range out { + if chunk.Err != nil { + return resp, chunk.Err + } + if len(chunk.Payload) > 0 { + _, _ = buffer.Write(chunk.Payload) + _, _ = buffer.Write([]byte("\n")) + } + } + resp = cliproxyexecutor.Response{Payload: e.convertStreamToNonStream(buffer.Bytes())} + + reporter.publish(ctx, parseAntigravityUsage(resp.Payload)) + var param any + converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, resp.Payload, ¶m) + resp = cliproxyexecutor.Response{Payload: []byte(converted)} + reporter.ensurePublished(ctx) + + return resp, nil + } + + switch { + case lastStatus != 0: + err = statusErr{code: lastStatus, msg: string(lastBody)} + case lastErr != nil: + err = lastErr + default: + err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"} + } + return resp, err +} + +func (e *AntigravityExecutor) convertStreamToNonStream(stream []byte) []byte { + responseTemplate := "" + var traceID string + var finishReason string + var modelVersion string + var responseID string + var role string + var usageRaw string + parts := make([]map[string]interface{}, 0) + var pendingKind string + var pendingText strings.Builder + var pendingThoughtSig string + + flushPending := func() { + if pendingKind == "" { + return + } + text := pendingText.String() + switch pendingKind { + case "text": + if strings.TrimSpace(text) == "" { + pendingKind = "" + pendingText.Reset() + pendingThoughtSig = "" + return + } + parts = append(parts, map[string]interface{}{"text": text}) + case "thought": + if strings.TrimSpace(text) == "" && pendingThoughtSig == "" { + pendingKind = "" + pendingText.Reset() + pendingThoughtSig = "" + return + } + part := map[string]interface{}{"thought": true} + part["text"] = text + if pendingThoughtSig != "" { + part["thoughtSignature"] = pendingThoughtSig + } + parts = append(parts, part) + } + pendingKind = "" + pendingText.Reset() + pendingThoughtSig = "" + } + + normalizePart := func(partResult gjson.Result) map[string]interface{} { + var m map[string]interface{} + _ = json.Unmarshal([]byte(partResult.Raw), &m) + if m == nil { + m = map[string]interface{}{} + } + sig := partResult.Get("thoughtSignature").String() + if sig == "" { + sig = partResult.Get("thought_signature").String() + } + if sig != "" { + m["thoughtSignature"] = sig + delete(m, "thought_signature") + } + if inlineData, ok := m["inline_data"]; ok { + m["inlineData"] = inlineData + delete(m, "inline_data") + } + return m + } + + for _, line := range bytes.Split(stream, []byte("\n")) { + trimmed := bytes.TrimSpace(line) + if len(trimmed) == 0 || !gjson.ValidBytes(trimmed) { + continue + } + + root := gjson.ParseBytes(trimmed) + responseNode := root.Get("response") + if !responseNode.Exists() { + if root.Get("candidates").Exists() { + responseNode = root + } else { + continue + } + } + responseTemplate = responseNode.Raw + + if traceResult := root.Get("traceId"); traceResult.Exists() && traceResult.String() != "" { + traceID = traceResult.String() + } + + if roleResult := responseNode.Get("candidates.0.content.role"); roleResult.Exists() { + role = roleResult.String() + } + + if finishResult := responseNode.Get("candidates.0.finishReason"); finishResult.Exists() && finishResult.String() != "" { + finishReason = finishResult.String() + } + + if modelResult := responseNode.Get("modelVersion"); modelResult.Exists() && modelResult.String() != "" { + modelVersion = modelResult.String() + } + if responseIDResult := responseNode.Get("responseId"); responseIDResult.Exists() && responseIDResult.String() != "" { + responseID = responseIDResult.String() + } + if usageResult := responseNode.Get("usageMetadata"); usageResult.Exists() { + usageRaw = usageResult.Raw + } else if usageResult := root.Get("usageMetadata"); usageResult.Exists() { + usageRaw = usageResult.Raw + } + + if partsResult := responseNode.Get("candidates.0.content.parts"); partsResult.IsArray() { + for _, part := range partsResult.Array() { + hasFunctionCall := part.Get("functionCall").Exists() + hasInlineData := part.Get("inlineData").Exists() || part.Get("inline_data").Exists() + sig := part.Get("thoughtSignature").String() + if sig == "" { + sig = part.Get("thought_signature").String() + } + text := part.Get("text").String() + thought := part.Get("thought").Bool() + + if hasFunctionCall || hasInlineData { + flushPending() + parts = append(parts, normalizePart(part)) + continue + } + + if thought || part.Get("text").Exists() { + kind := "text" + if thought { + kind = "thought" + } + if pendingKind != "" && pendingKind != kind { + flushPending() + } + pendingKind = kind + pendingText.WriteString(text) + if kind == "thought" && sig != "" { + pendingThoughtSig = sig + } + continue + } + + flushPending() + parts = append(parts, normalizePart(part)) + } + } + } + flushPending() + + if responseTemplate == "" { + responseTemplate = `{"candidates":[{"content":{"role":"model","parts":[]}}]}` + } + + partsJSON, _ := json.Marshal(parts) + responseTemplate, _ = sjson.SetRaw(responseTemplate, "candidates.0.content.parts", string(partsJSON)) + if role != "" { + responseTemplate, _ = sjson.Set(responseTemplate, "candidates.0.content.role", role) + } + if finishReason != "" { + responseTemplate, _ = sjson.Set(responseTemplate, "candidates.0.finishReason", finishReason) + } + if modelVersion != "" { + responseTemplate, _ = sjson.Set(responseTemplate, "modelVersion", modelVersion) + } + if responseID != "" { + responseTemplate, _ = sjson.Set(responseTemplate, "responseId", responseID) + } + if usageRaw != "" { + responseTemplate, _ = sjson.SetRaw(responseTemplate, "usageMetadata", usageRaw) + } else if !gjson.Get(responseTemplate, "usageMetadata").Exists() { + responseTemplate, _ = sjson.Set(responseTemplate, "usageMetadata.promptTokenCount", 0) + responseTemplate, _ = sjson.Set(responseTemplate, "usageMetadata.candidatesTokenCount", 0) + responseTemplate, _ = sjson.Set(responseTemplate, "usageMetadata.totalTokenCount", 0) + } + + output := `{"response":{},"traceId":""}` + output, _ = sjson.SetRaw(output, "response", responseTemplate) + if traceID != "" { + output, _ = sjson.Set(output, "traceId", traceID) + } + return []byte(output) +} + // ExecuteStream performs a streaming request to the Antigravity API. func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { ctx = context.WithValue(ctx, "alt", "")