mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-02-03 04:50:52 +08:00
feat(executor): share SSE usage filtering across streams
This commit is contained in:
@@ -151,7 +151,7 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
|
|||||||
case wsrelay.MessageTypeStreamChunk:
|
case wsrelay.MessageTypeStreamChunk:
|
||||||
if len(event.Payload) > 0 {
|
if len(event.Payload) > 0 {
|
||||||
appendAPIResponseChunk(ctx, e.cfg, bytes.Clone(event.Payload))
|
appendAPIResponseChunk(ctx, e.cfg, bytes.Clone(event.Payload))
|
||||||
filtered := filterAIStudioUsageMetadata(event.Payload)
|
filtered := FilterSSEUsageMetadata(event.Payload)
|
||||||
if detail, ok := parseGeminiStreamUsage(filtered); ok {
|
if detail, ok := parseGeminiStreamUsage(filtered); ok {
|
||||||
reporter.publish(ctx, detail)
|
reporter.publish(ctx, detail)
|
||||||
}
|
}
|
||||||
@@ -296,65 +296,6 @@ func (e *AIStudioExecutor) buildEndpoint(model, action, alt string) string {
|
|||||||
return base
|
return base
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterAIStudioUsageMetadata removes usageMetadata from intermediate SSE events so that
|
|
||||||
// only the terminal chunk retains token statistics.
|
|
||||||
func filterAIStudioUsageMetadata(payload []byte) []byte {
|
|
||||||
if len(payload) == 0 {
|
|
||||||
return payload
|
|
||||||
}
|
|
||||||
|
|
||||||
lines := bytes.Split(payload, []byte("\n"))
|
|
||||||
modified := false
|
|
||||||
for idx, line := range lines {
|
|
||||||
trimmed := bytes.TrimSpace(line)
|
|
||||||
if len(trimmed) == 0 || !bytes.HasPrefix(trimmed, []byte("data:")) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dataIdx := bytes.Index(line, []byte("data:"))
|
|
||||||
if dataIdx < 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
rawJSON := bytes.TrimSpace(line[dataIdx+5:])
|
|
||||||
cleaned, changed := stripUsageMetadataFromJSON(rawJSON)
|
|
||||||
if !changed {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var rebuilt []byte
|
|
||||||
rebuilt = append(rebuilt, line[:dataIdx]...)
|
|
||||||
rebuilt = append(rebuilt, []byte("data:")...)
|
|
||||||
if len(cleaned) > 0 {
|
|
||||||
rebuilt = append(rebuilt, ' ')
|
|
||||||
rebuilt = append(rebuilt, cleaned...)
|
|
||||||
}
|
|
||||||
lines[idx] = rebuilt
|
|
||||||
modified = true
|
|
||||||
}
|
|
||||||
if !modified {
|
|
||||||
return payload
|
|
||||||
}
|
|
||||||
return bytes.Join(lines, []byte("\n"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// stripUsageMetadataFromJSON drops usageMetadata when no finishReason is present.
|
|
||||||
func stripUsageMetadataFromJSON(rawJSON []byte) ([]byte, bool) {
|
|
||||||
jsonBytes := bytes.TrimSpace(rawJSON)
|
|
||||||
if len(jsonBytes) == 0 || !gjson.ValidBytes(jsonBytes) {
|
|
||||||
return rawJSON, false
|
|
||||||
}
|
|
||||||
finishReason := gjson.GetBytes(jsonBytes, "candidates.0.finishReason")
|
|
||||||
if finishReason.Exists() && finishReason.String() != "" {
|
|
||||||
return rawJSON, false
|
|
||||||
}
|
|
||||||
if !gjson.GetBytes(jsonBytes, "usageMetadata").Exists() {
|
|
||||||
return rawJSON, false
|
|
||||||
}
|
|
||||||
cleaned, err := sjson.DeleteBytes(jsonBytes, "usageMetadata")
|
|
||||||
if err != nil {
|
|
||||||
return rawJSON, false
|
|
||||||
}
|
|
||||||
return cleaned, true
|
|
||||||
}
|
|
||||||
|
|
||||||
// ensureColonSpacedJSON normalizes JSON objects so that colons are followed by a single space while
|
// ensureColonSpacedJSON normalizes JSON objects so that colons are followed by a single space while
|
||||||
// keeping the payload otherwise compact. Non-JSON inputs are returned unchanged.
|
// keeping the payload otherwise compact. Non-JSON inputs are returned unchanged.
|
||||||
func ensureColonSpacedJSON(payload []byte) []byte {
|
func ensureColonSpacedJSON(payload []byte) []byte {
|
||||||
|
|||||||
@@ -167,6 +167,11 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya
|
|||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
line := scanner.Bytes()
|
line := scanner.Bytes()
|
||||||
appendAPIResponseChunk(ctx, e.cfg, line)
|
appendAPIResponseChunk(ctx, e.cfg, line)
|
||||||
|
|
||||||
|
// Filter usage metadata for all models
|
||||||
|
// Only retain usage statistics in the terminal chunk
|
||||||
|
line = FilterSSEUsageMetadata(line)
|
||||||
|
|
||||||
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(line), ¶m)
|
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(line), ¶m)
|
||||||
for i := range chunks {
|
for i := range chunks {
|
||||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
||||||
"github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage"
|
"github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
"github.com/tidwall/sjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
type usageReporter struct {
|
type usageReporter struct {
|
||||||
@@ -383,3 +384,94 @@ func jsonPayload(line []byte) []byte {
|
|||||||
}
|
}
|
||||||
return trimmed
|
return trimmed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FilterSSEUsageMetadata removes usageMetadata from intermediate SSE events so that
|
||||||
|
// only the terminal chunk retains token statistics.
|
||||||
|
// This function is shared between aistudio and antigravity executors.
|
||||||
|
func FilterSSEUsageMetadata(payload []byte) []byte {
|
||||||
|
if len(payload) == 0 {
|
||||||
|
return payload
|
||||||
|
}
|
||||||
|
|
||||||
|
lines := bytes.Split(payload, []byte("\n"))
|
||||||
|
modified := false
|
||||||
|
for idx, line := range lines {
|
||||||
|
trimmed := bytes.TrimSpace(line)
|
||||||
|
if len(trimmed) == 0 || !bytes.HasPrefix(trimmed, []byte("data:")) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dataIdx := bytes.Index(line, []byte("data:"))
|
||||||
|
if dataIdx < 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rawJSON := bytes.TrimSpace(line[dataIdx+5:])
|
||||||
|
cleaned, changed := StripUsageMetadataFromJSON(rawJSON)
|
||||||
|
if !changed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var rebuilt []byte
|
||||||
|
rebuilt = append(rebuilt, line[:dataIdx]...)
|
||||||
|
rebuilt = append(rebuilt, []byte("data:")...)
|
||||||
|
if len(cleaned) > 0 {
|
||||||
|
rebuilt = append(rebuilt, ' ')
|
||||||
|
rebuilt = append(rebuilt, cleaned...)
|
||||||
|
}
|
||||||
|
lines[idx] = rebuilt
|
||||||
|
modified = true
|
||||||
|
}
|
||||||
|
if !modified {
|
||||||
|
return payload
|
||||||
|
}
|
||||||
|
return bytes.Join(lines, []byte("\n"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// StripUsageMetadataFromJSON drops usageMetadata when no finishReason is present.
|
||||||
|
// This function is shared between aistudio and antigravity executors.
|
||||||
|
// It handles both formats:
|
||||||
|
// - Aistudio: candidates.0.finishReason
|
||||||
|
// - Antigravity: response.candidates.0.finishReason
|
||||||
|
func StripUsageMetadataFromJSON(rawJSON []byte) ([]byte, bool) {
|
||||||
|
jsonBytes := bytes.TrimSpace(rawJSON)
|
||||||
|
if len(jsonBytes) == 0 || !gjson.ValidBytes(jsonBytes) {
|
||||||
|
return rawJSON, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for finishReason in both aistudio and antigravity formats
|
||||||
|
finishReason := gjson.GetBytes(jsonBytes, "candidates.0.finishReason")
|
||||||
|
if !finishReason.Exists() {
|
||||||
|
finishReason = gjson.GetBytes(jsonBytes, "response.candidates.0.finishReason")
|
||||||
|
}
|
||||||
|
|
||||||
|
// If finishReason exists and is not empty, keep the usageMetadata
|
||||||
|
if finishReason.Exists() && finishReason.String() != "" {
|
||||||
|
return rawJSON, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for usageMetadata in both possible locations
|
||||||
|
usageMetadata := gjson.GetBytes(jsonBytes, "usageMetadata")
|
||||||
|
if !usageMetadata.Exists() {
|
||||||
|
usageMetadata = gjson.GetBytes(jsonBytes, "response.usageMetadata")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !usageMetadata.Exists() {
|
||||||
|
return rawJSON, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove usageMetadata from both possible locations
|
||||||
|
cleaned := jsonBytes
|
||||||
|
var changed bool
|
||||||
|
|
||||||
|
// Try to remove usageMetadata from root level
|
||||||
|
if gjson.GetBytes(cleaned, "usageMetadata").Exists() {
|
||||||
|
cleaned, _ = sjson.DeleteBytes(cleaned, "usageMetadata")
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to remove usageMetadata from response level
|
||||||
|
if gjson.GetBytes(cleaned, "response.usageMetadata").Exists() {
|
||||||
|
cleaned, _ = sjson.DeleteBytes(cleaned, "response.usageMetadata")
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return cleaned, changed
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user