From ea6065f1b15e4567daa66cae94b91f0bb1ee944b Mon Sep 17 00:00:00 2001 From: hkfires <10558748+hkfires@users.noreply.github.com> Date: Sat, 25 Oct 2025 16:53:49 +0800 Subject: [PATCH] fix(aistudio): strip usage metadata from non-final stream chunks --- .../runtime/executor/aistudio_executor.go | 71 +++++++++++++++++-- 1 file changed, 66 insertions(+), 5 deletions(-) diff --git a/internal/runtime/executor/aistudio_executor.go b/internal/runtime/executor/aistudio_executor.go index 4bcdab3a..53de71c8 100644 --- a/internal/runtime/executor/aistudio_executor.go +++ b/internal/runtime/executor/aistudio_executor.go @@ -150,13 +150,15 @@ func (e *AistudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth case wsrelay.MessageTypeStreamChunk: if len(event.Payload) > 0 { appendAPIResponseChunk(ctx, e.cfg, bytes.Clone(event.Payload)) - if detail, ok := parseGeminiStreamUsage(event.Payload); ok { + filtered := filterAistudioUsageMetadata(event.Payload) + if detail, ok := parseGeminiStreamUsage(filtered); ok { reporter.publish(ctx, detail) } - } - lines := sdktranslator.TranslateStream(ctx, body.toFormat, opts.SourceFormat, req.Model, bytes.Clone(opts.OriginalRequest), translatedReq, bytes.Clone(event.Payload), ¶m) - for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])} + lines := sdktranslator.TranslateStream(ctx, body.toFormat, opts.SourceFormat, req.Model, bytes.Clone(opts.OriginalRequest), translatedReq, bytes.Clone(filtered), ¶m) + for i := range lines { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])} + } + break } case wsrelay.MessageTypeStreamEnd: return @@ -281,3 +283,62 @@ func (e *AistudioExecutor) buildEndpoint(model, action, alt string) string { } return base } + +// filterAistudioUsageMetadata removes usageMetadata from intermediate SSE events so that +// only the terminal chunk retains token statistics. +func filterAistudioUsageMetadata(payload []byte) []byte { + if len(payload) == 0 { + return payload + } + + lines := bytes.Split(payload, []byte("\n")) + modified := false + for idx, line := range lines { + trimmed := bytes.TrimSpace(line) + if len(trimmed) == 0 || !bytes.HasPrefix(trimmed, []byte("data:")) { + continue + } + dataIdx := bytes.Index(line, []byte("data:")) + if dataIdx < 0 { + continue + } + rawJSON := bytes.TrimSpace(line[dataIdx+5:]) + cleaned, changed := stripUsageMetadataFromJSON(rawJSON) + if !changed { + continue + } + var rebuilt []byte + rebuilt = append(rebuilt, line[:dataIdx]...) + rebuilt = append(rebuilt, []byte("data:")...) + if len(cleaned) > 0 { + rebuilt = append(rebuilt, ' ') + rebuilt = append(rebuilt, cleaned...) + } + lines[idx] = rebuilt + modified = true + } + if !modified { + return payload + } + return bytes.Join(lines, []byte("\n")) +} + +// stripUsageMetadataFromJSON drops usageMetadata when no finishReason is present. +func stripUsageMetadataFromJSON(rawJSON []byte) ([]byte, bool) { + jsonBytes := bytes.TrimSpace(rawJSON) + if len(jsonBytes) == 0 || !gjson.ValidBytes(jsonBytes) { + return rawJSON, false + } + finishReason := gjson.GetBytes(jsonBytes, "candidates.0.finishReason") + if finishReason.Exists() && finishReason.String() != "" { + return rawJSON, false + } + if !gjson.GetBytes(jsonBytes, "usageMetadata").Exists() { + return rawJSON, false + } + cleaned, err := sjson.DeleteBytes(jsonBytes, "usageMetadata") + if err != nil { + return rawJSON, false + } + return cleaned, true +}