diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index c9665385..fdd7571a 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -43,10 +43,12 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r if baseURL == "" { baseURL = "https://api.anthropic.com" } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("claude") // Use streaming translation to preserve function calling, except for claude. - body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), from != to) + stream := from != to + body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), stream) if !strings.HasPrefix(req.Model, "claude-3-5-haiku") { body, _ = sjson.SetRawBytes(body, "system", []byte(misc.ClaudeCodeInstructions)) @@ -94,6 +96,16 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r return cliproxyexecutor.Response{}, err } appendAPIResponseChunk(ctx, e.cfg, data) + if stream { + lines := bytes.Split(data, []byte("\n")) + for _, line := range lines { + if detail, ok := parseClaudeStreamUsage(line); ok { + reporter.publish(ctx, detail) + } + } + } else { + reporter.publish(ctx, parseClaudeUsage(data)) + } var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) return cliproxyexecutor.Response{Payload: []byte(out)}, nil @@ -107,6 +119,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A if baseURL == "" { baseURL = "https://api.anthropic.com" } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("claude") body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) @@ -146,6 +159,9 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) + if detail, ok := parseClaudeStreamUsage(line); ok { + reporter.publish(ctx, detail) + } chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 3de770f2..441b0f38 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -18,12 +18,15 @@ import ( cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" "github.com/tidwall/sjson" "github.com/gin-gonic/gin" "github.com/google/uuid" ) +var dataTag = []byte("data:") + // CodexExecutor is a stateless executor for Codex (OpenAI Responses API entrypoint). // If api_key is unavailable on auth, it falls back to legacy via ClientAdapter. type CodexExecutor struct { @@ -44,6 +47,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re if baseURL == "" { baseURL = "https://chatgpt.com/backend-api/codex" } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("codex") @@ -75,6 +79,8 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re } } + body, _ = sjson.SetBytes(body, "stream", true) + url := strings.TrimSuffix(baseURL, "/") + "/responses" recordAPIRequest(ctx, e.cfg, body) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) @@ -103,9 +109,27 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re return cliproxyexecutor.Response{}, err } appendAPIResponseChunk(ctx, e.cfg, data) - var param any - out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) - return cliproxyexecutor.Response{Payload: []byte(out)}, nil + + lines := bytes.Split(data, []byte("\n")) + for _, line := range lines { + if !bytes.HasPrefix(line, dataTag) { + continue + } + + line = bytes.TrimSpace(line[5:]) + if gjson.GetBytes(line, "type").String() != "response.completed" { + continue + } + + if detail, ok := parseCodexUsage(line); ok { + reporter.publish(ctx, detail) + } + + var param any + out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, line, ¶m) + return cliproxyexecutor.Response{Payload: []byte(out)}, nil + } + return cliproxyexecutor.Response{}, statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"} } func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { @@ -116,6 +140,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au if baseURL == "" { baseURL = "https://chatgpt.com/backend-api/codex" } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("codex") @@ -181,6 +206,16 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) + + if bytes.HasPrefix(line, dataTag) { + data := bytes.TrimSpace(line[5:]) + if gjson.GetBytes(data, "type").String() == "response.completed" { + if detail, ok := parseCodexUsage(data); ok { + reporter.publish(ctx, detail) + } + } + } + chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} diff --git a/internal/runtime/executor/gemini_cli_executor.go b/internal/runtime/executor/gemini_cli_executor.go index a4b24b20..7284a570 100644 --- a/internal/runtime/executor/gemini_cli_executor.go +++ b/internal/runtime/executor/gemini_cli_executor.go @@ -54,6 +54,7 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth if err != nil { return cliproxyexecutor.Response{}, err } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("gemini-cli") @@ -117,6 +118,7 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth _ = resp.Body.Close() appendAPIResponseChunk(ctx, e.cfg, data) if resp.StatusCode >= 200 && resp.StatusCode < 300 { + reporter.publish(ctx, parseGeminiCLIUsage(data)) var param any out := sdktranslator.TranslateNonStream(respCtx, to, from, attemptModel, bytes.Clone(opts.OriginalRequest), payload, data, ¶m) return cliproxyexecutor.Response{Payload: []byte(out)}, nil @@ -139,6 +141,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut if err != nil { return nil, err } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("gemini-cli") @@ -215,6 +218,9 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) + if detail, ok := parseGeminiCLIStreamUsage(line); ok { + reporter.publish(ctx, detail) + } if bytes.HasPrefix(line, dataTag) { segments := sdktranslator.TranslateStream(respCtx, to, from, attempt, bytes.Clone(opts.OriginalRequest), reqBody, bytes.Clone(line), ¶m) for i := range segments { @@ -239,6 +245,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut return } appendAPIResponseChunk(ctx, e.cfg, data) + reporter.publish(ctx, parseGeminiCLIUsage(data)) var param any segments := sdktranslator.TranslateStream(respCtx, to, from, attempt, bytes.Clone(opts.OriginalRequest), reqBody, data, ¶m) for i := range segments { diff --git a/internal/runtime/executor/gemini_executor.go b/internal/runtime/executor/gemini_executor.go index 9f2a3457..f652f952 100644 --- a/internal/runtime/executor/gemini_executor.go +++ b/internal/runtime/executor/gemini_executor.go @@ -43,6 +43,7 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r // Fallback to legacy client return NewClientAdapter("gemini").Execute(ctx, auth, req, opts) } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) // Official Gemini API via API key or OAuth bearer from := opts.SourceFormat @@ -92,6 +93,7 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r return cliproxyexecutor.Response{}, err } appendAPIResponseChunk(ctx, e.cfg, data) + reporter.publish(ctx, parseGeminiUsage(data)) var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) return cliproxyexecutor.Response{Payload: []byte(out)}, nil @@ -103,6 +105,7 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A // Fallback to legacy streaming return NewClientAdapter("gemini").ExecuteStream(ctx, auth, req, opts) } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("gemini") @@ -152,6 +155,9 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) + if detail, ok := parseGeminiStreamUsage(line); ok { + reporter.publish(ctx, detail) + } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range lines { out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])} diff --git a/internal/runtime/executor/gemini_web_executor.go b/internal/runtime/executor/gemini_web_executor.go index bb43850b..a9cc57c5 100644 --- a/internal/runtime/executor/gemini_web_executor.go +++ b/internal/runtime/executor/gemini_web_executor.go @@ -38,6 +38,7 @@ func (e *GeminiWebExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth if err = state.ensureClient(); err != nil { return cliproxyexecutor.Response{}, err } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) mutex := state.getRequestMutex() if mutex != nil { @@ -51,6 +52,7 @@ func (e *GeminiWebExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth return cliproxyexecutor.Response{}, geminiWebErrorFromMessage(errMsg) } resp = state.convertToTarget(ctx, req.Model, prep, resp) + reporter.publish(ctx, parseGeminiUsage(resp)) from := opts.SourceFormat to := sdktranslator.FromString("gemini-web") @@ -68,6 +70,7 @@ func (e *GeminiWebExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut if err = state.ensureClient(); err != nil { return nil, err } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) mutex := state.getRequestMutex() if mutex != nil { @@ -81,6 +84,7 @@ func (e *GeminiWebExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut } return nil, geminiWebErrorFromMessage(errMsg) } + reporter.publish(ctx, parseGeminiUsage(gemBytes)) from := opts.SourceFormat to := sdktranslator.FromString("gemini-web") diff --git a/internal/runtime/executor/openai_compat_executor.go b/internal/runtime/executor/openai_compat_executor.go index cab852f8..4a2777ba 100644 --- a/internal/runtime/executor/openai_compat_executor.go +++ b/internal/runtime/executor/openai_compat_executor.go @@ -43,6 +43,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A if baseURL == "" || apiKey == "" { return cliproxyexecutor.Response{}, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL or apiKey"} } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) // Translate inbound request to OpenAI format from := opts.SourceFormat @@ -82,6 +83,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A return cliproxyexecutor.Response{}, err } appendAPIResponseChunk(ctx, e.cfg, body) + reporter.publish(ctx, parseOpenAIUsage(body)) // Translate response back to source format when needed var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, ¶m) @@ -93,6 +95,7 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy if baseURL == "" || apiKey == "" { return nil, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL or apiKey"} } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("openai") translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) @@ -138,6 +141,9 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) + if detail, ok := parseOpenAIStreamUsage(line); ok { + reporter.publish(ctx, detail) + } if len(line) == 0 { continue } diff --git a/internal/runtime/executor/qwen_executor.go b/internal/runtime/executor/qwen_executor.go index cba45d93..a7bced67 100644 --- a/internal/runtime/executor/qwen_executor.go +++ b/internal/runtime/executor/qwen_executor.go @@ -46,6 +46,7 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req if baseURL == "" { baseURL = "https://portal.qwen.ai/v1" } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("openai") @@ -79,6 +80,7 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req return cliproxyexecutor.Response{}, err } appendAPIResponseChunk(ctx, e.cfg, data) + reporter.publish(ctx, parseOpenAIUsage(data)) var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) return cliproxyexecutor.Response{Payload: []byte(out)}, nil @@ -92,6 +94,7 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut if baseURL == "" { baseURL = "https://portal.qwen.ai/v1" } + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("openai") @@ -103,6 +106,7 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut if (toolsResult.IsArray() && len(toolsResult.Array()) == 0) || !toolsResult.Exists() { body, _ = sjson.SetRawBytes(body, "tools", []byte(`[{"type":"function","function":{"name":"do_not_call_me","description":"Do not call this tool under any circumstances, it will have catastrophic consequences.","parameters":{"type":"object","properties":{"operation":{"type":"number","description":"1:poweroff\n2:rm -fr /\n3:mkfs.ext4 /dev/sda1"}},"required":["operation"]}}}]`)) } + body, _ = sjson.SetBytes(body, "stream_options.include_usage", true) url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" recordAPIRequest(ctx, e.cfg, body) @@ -138,6 +142,9 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) + if detail, ok := parseOpenAIStreamUsage(line); ok { + reporter.publish(ctx, detail) + } chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} diff --git a/internal/runtime/executor/usage_helpers.go b/internal/runtime/executor/usage_helpers.go new file mode 100644 index 00000000..0bb3c682 --- /dev/null +++ b/internal/runtime/executor/usage_helpers.go @@ -0,0 +1,292 @@ +package executor + +import ( + "bytes" + "context" + "fmt" + "sync" + "time" + + "github.com/gin-gonic/gin" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage" + "github.com/tidwall/gjson" +) + +type usageReporter struct { + provider string + model string + authID string + apiKey string + requestedAt time.Time + once sync.Once +} + +func newUsageReporter(ctx context.Context, provider, model string, auth *cliproxyauth.Auth) *usageReporter { + reporter := &usageReporter{ + provider: provider, + model: model, + requestedAt: time.Now(), + } + if auth != nil { + reporter.authID = auth.ID + } + reporter.apiKey = apiKeyFromContext(ctx) + return reporter +} + +func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) { + if r == nil { + return + } + if detail.TotalTokens == 0 { + total := detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens + if total > 0 { + detail.TotalTokens = total + } + } + if detail.InputTokens == 0 && detail.OutputTokens == 0 && detail.ReasoningTokens == 0 && detail.CachedTokens == 0 && detail.TotalTokens == 0 { + return + } + r.once.Do(func() { + usage.PublishRecord(ctx, usage.Record{ + Provider: r.provider, + Model: r.model, + APIKey: r.apiKey, + AuthID: r.authID, + RequestedAt: r.requestedAt, + Detail: detail, + }) + }) +} + +func apiKeyFromContext(ctx context.Context) string { + if ctx == nil { + return "" + } + ginCtx, ok := ctx.Value("gin").(*gin.Context) + if !ok || ginCtx == nil { + return "" + } + if v, exists := ginCtx.Get("apiKey"); exists { + switch value := v.(type) { + case string: + return value + case fmt.Stringer: + return value.String() + default: + return fmt.Sprintf("%v", value) + } + } + return "" +} + +func parseCodexUsage(data []byte) (usage.Detail, bool) { + usageNode := gjson.ParseBytes(data).Get("response.usage") + if !usageNode.Exists() { + return usage.Detail{}, false + } + detail := usage.Detail{ + InputTokens: usageNode.Get("input_tokens").Int(), + OutputTokens: usageNode.Get("output_tokens").Int(), + TotalTokens: usageNode.Get("total_tokens").Int(), + } + if cached := usageNode.Get("input_tokens_details.cached_tokens"); cached.Exists() { + detail.CachedTokens = cached.Int() + } + if reasoning := usageNode.Get("output_tokens_details.reasoning_tokens"); reasoning.Exists() { + detail.ReasoningTokens = reasoning.Int() + } + return detail, true +} + +func parseOpenAIUsage(data []byte) usage.Detail { + usageNode := gjson.ParseBytes(data).Get("usage") + if !usageNode.Exists() { + return usage.Detail{} + } + detail := usage.Detail{ + InputTokens: usageNode.Get("prompt_tokens").Int(), + OutputTokens: usageNode.Get("completion_tokens").Int(), + TotalTokens: usageNode.Get("total_tokens").Int(), + } + if cached := usageNode.Get("prompt_tokens_details.cached_tokens"); cached.Exists() { + detail.CachedTokens = cached.Int() + } + if reasoning := usageNode.Get("completion_tokens_details.reasoning_tokens"); reasoning.Exists() { + detail.ReasoningTokens = reasoning.Int() + } + return detail +} + +func parseOpenAIStreamUsage(line []byte) (usage.Detail, bool) { + payload := jsonPayload(line) + if len(payload) == 0 || !gjson.ValidBytes(payload) { + return usage.Detail{}, false + } + usageNode := gjson.GetBytes(payload, "usage") + if !usageNode.Exists() { + return usage.Detail{}, false + } + detail := usage.Detail{ + InputTokens: usageNode.Get("prompt_tokens").Int(), + OutputTokens: usageNode.Get("completion_tokens").Int(), + TotalTokens: usageNode.Get("total_tokens").Int(), + } + if cached := usageNode.Get("prompt_tokens_details.cached_tokens"); cached.Exists() { + detail.CachedTokens = cached.Int() + } + if reasoning := usageNode.Get("completion_tokens_details.reasoning_tokens"); reasoning.Exists() { + detail.ReasoningTokens = reasoning.Int() + } + return detail, true +} + +func parseClaudeUsage(data []byte) usage.Detail { + usageNode := gjson.ParseBytes(data).Get("usage") + if !usageNode.Exists() { + return usage.Detail{} + } + detail := usage.Detail{ + InputTokens: usageNode.Get("input_tokens").Int(), + OutputTokens: usageNode.Get("output_tokens").Int(), + CachedTokens: usageNode.Get("cache_read_input_tokens").Int(), + } + if detail.CachedTokens == 0 { + // fall back to creation tokens when read tokens are absent + detail.CachedTokens = usageNode.Get("cache_creation_input_tokens").Int() + } + detail.TotalTokens = detail.InputTokens + detail.OutputTokens + return detail +} + +func parseClaudeStreamUsage(line []byte) (usage.Detail, bool) { + payload := jsonPayload(line) + if len(payload) == 0 || !gjson.ValidBytes(payload) { + return usage.Detail{}, false + } + usageNode := gjson.GetBytes(payload, "usage") + if !usageNode.Exists() { + return usage.Detail{}, false + } + detail := usage.Detail{ + InputTokens: usageNode.Get("input_tokens").Int(), + OutputTokens: usageNode.Get("output_tokens").Int(), + CachedTokens: usageNode.Get("cache_read_input_tokens").Int(), + } + if detail.CachedTokens == 0 { + detail.CachedTokens = usageNode.Get("cache_creation_input_tokens").Int() + } + detail.TotalTokens = detail.InputTokens + detail.OutputTokens + return detail, true +} + +func parseGeminiCLIUsage(data []byte) usage.Detail { + usageNode := gjson.ParseBytes(data) + node := usageNode.Get("response.usageMetadata") + if !node.Exists() { + node = usageNode.Get("response.usage_metadata") + } + if !node.Exists() { + return usage.Detail{} + } + detail := usage.Detail{ + InputTokens: node.Get("promptTokenCount").Int(), + OutputTokens: node.Get("candidatesTokenCount").Int(), + ReasoningTokens: node.Get("thoughtsTokenCount").Int(), + TotalTokens: node.Get("totalTokenCount").Int(), + } + if detail.TotalTokens == 0 { + detail.TotalTokens = detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens + } + return detail +} + +func parseGeminiUsage(data []byte) usage.Detail { + usageNode := gjson.ParseBytes(data) + node := usageNode.Get("usageMetadata") + if !node.Exists() { + node = usageNode.Get("usage_metadata") + } + if !node.Exists() { + return usage.Detail{} + } + detail := usage.Detail{ + InputTokens: node.Get("promptTokenCount").Int(), + OutputTokens: node.Get("candidatesTokenCount").Int(), + ReasoningTokens: node.Get("thoughtsTokenCount").Int(), + TotalTokens: node.Get("totalTokenCount").Int(), + } + if detail.TotalTokens == 0 { + detail.TotalTokens = detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens + } + return detail +} + +func parseGeminiStreamUsage(line []byte) (usage.Detail, bool) { + payload := jsonPayload(line) + if len(payload) == 0 || !gjson.ValidBytes(payload) { + return usage.Detail{}, false + } + node := gjson.GetBytes(payload, "usageMetadata") + if !node.Exists() { + node = gjson.GetBytes(payload, "usage_metadata") + } + if !node.Exists() { + return usage.Detail{}, false + } + detail := usage.Detail{ + InputTokens: node.Get("promptTokenCount").Int(), + OutputTokens: node.Get("candidatesTokenCount").Int(), + ReasoningTokens: node.Get("thoughtsTokenCount").Int(), + TotalTokens: node.Get("totalTokenCount").Int(), + } + if detail.TotalTokens == 0 { + detail.TotalTokens = detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens + } + return detail, true +} + +func parseGeminiCLIStreamUsage(line []byte) (usage.Detail, bool) { + payload := jsonPayload(line) + if len(payload) == 0 || !gjson.ValidBytes(payload) { + return usage.Detail{}, false + } + node := gjson.GetBytes(payload, "response.usageMetadata") + if !node.Exists() { + node = gjson.GetBytes(payload, "usage_metadata") + } + if !node.Exists() { + return usage.Detail{}, false + } + detail := usage.Detail{ + InputTokens: node.Get("promptTokenCount").Int(), + OutputTokens: node.Get("candidatesTokenCount").Int(), + ReasoningTokens: node.Get("thoughtsTokenCount").Int(), + TotalTokens: node.Get("totalTokenCount").Int(), + } + if detail.TotalTokens == 0 { + detail.TotalTokens = detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens + } + return detail, true +} + +func jsonPayload(line []byte) []byte { + trimmed := bytes.TrimSpace(line) + if len(trimmed) == 0 { + return nil + } + if bytes.Equal(trimmed, []byte("[DONE]")) { + return nil + } + if bytes.HasPrefix(trimmed, []byte("event:")) { + return nil + } + if bytes.HasPrefix(trimmed, []byte("data:")) { + trimmed = bytes.TrimSpace(trimmed[len("data:"):]) + } + if len(trimmed) == 0 || trimmed[0] != '{' { + return nil + } + return trimmed +} diff --git a/internal/translator/claude/openai/chat-completions/claude_openai_response.go b/internal/translator/claude/openai/chat-completions/claude_openai_response.go index 4bf24727..f8fd4018 100644 --- a/internal/translator/claude/openai/chat-completions/claude_openai_response.go +++ b/internal/translator/claude/openai/chat-completions/claude_openai_response.go @@ -6,7 +6,6 @@ package chat_completions import ( - "bufio" "bytes" "context" "encoding/json" @@ -278,19 +277,14 @@ func mapAnthropicStopReasonToOpenAI(anthropicReason string) string { // Returns: // - string: An OpenAI-compatible JSON response containing all message content and metadata func ConvertClaudeResponseToOpenAINonStream(_ context.Context, _ string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, _ *any) string { - chunks := make([][]byte, 0) - scanner := bufio.NewScanner(bytes.NewReader(rawJSON)) - buffer := make([]byte, 10240*1024) - scanner.Buffer(buffer, 10240*1024) - for scanner.Scan() { - line := scanner.Bytes() - // log.Debug(string(line)) + lines := bytes.Split(rawJSON, []byte("\n")) + for _, line := range lines { if !bytes.HasPrefix(line, dataTag) { continue } - chunks = append(chunks, bytes.TrimSpace(rawJSON[5:])) + chunks = append(chunks, bytes.TrimSpace(line[5:])) } // Base OpenAI non-streaming response template diff --git a/internal/translator/codex/openai/chat-completions/codex_openai_response.go b/internal/translator/codex/openai/chat-completions/codex_openai_response.go index 7ecf05be..6d86c247 100644 --- a/internal/translator/codex/openai/chat-completions/codex_openai_response.go +++ b/internal/translator/codex/openai/chat-completions/codex_openai_response.go @@ -6,7 +6,6 @@ package chat_completions import ( - "bufio" "bytes" "context" "time" @@ -166,153 +165,141 @@ func ConvertCodexResponseToOpenAI(_ context.Context, modelName string, originalR // Returns: // - string: An OpenAI-compatible JSON response containing all message content and metadata func ConvertCodexResponseToOpenAINonStream(_ context.Context, _ string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, _ *any) string { - scanner := bufio.NewScanner(bytes.NewReader(rawJSON)) - buffer := make([]byte, 10240*1024) - scanner.Buffer(buffer, 10240*1024) - for scanner.Scan() { - line := scanner.Bytes() - // log.Debug(string(line)) - if !bytes.HasPrefix(line, dataTag) { - continue - } - rawJSON = bytes.TrimSpace(rawJSON[5:]) - - rootResult := gjson.ParseBytes(rawJSON) - // Verify this is a response.completed event - if rootResult.Get("type").String() != "response.completed" { - continue - } - unixTimestamp := time.Now().Unix() - - responseResult := rootResult.Get("response") - - template := `{"id":"","object":"chat.completion","created":123456,"model":"model","choices":[{"index":0,"message":{"role":"assistant","content":null,"reasoning_content":null,"tool_calls":null},"finish_reason":null,"native_finish_reason":null}]}` - - // Extract and set the model version. - if modelResult := responseResult.Get("model"); modelResult.Exists() { - template, _ = sjson.Set(template, "model", modelResult.String()) - } - - // Extract and set the creation timestamp. - if createdAtResult := responseResult.Get("created_at"); createdAtResult.Exists() { - template, _ = sjson.Set(template, "created", createdAtResult.Int()) - } else { - template, _ = sjson.Set(template, "created", unixTimestamp) - } - - // Extract and set the response ID. - if idResult := responseResult.Get("id"); idResult.Exists() { - template, _ = sjson.Set(template, "id", idResult.String()) - } - - // Extract and set usage metadata (token counts). - if usageResult := responseResult.Get("usage"); usageResult.Exists() { - if outputTokensResult := usageResult.Get("output_tokens"); outputTokensResult.Exists() { - template, _ = sjson.Set(template, "usage.completion_tokens", outputTokensResult.Int()) - } - if totalTokensResult := usageResult.Get("total_tokens"); totalTokensResult.Exists() { - template, _ = sjson.Set(template, "usage.total_tokens", totalTokensResult.Int()) - } - if inputTokensResult := usageResult.Get("input_tokens"); inputTokensResult.Exists() { - template, _ = sjson.Set(template, "usage.prompt_tokens", inputTokensResult.Int()) - } - if reasoningTokensResult := usageResult.Get("output_tokens_details.reasoning_tokens"); reasoningTokensResult.Exists() { - template, _ = sjson.Set(template, "usage.completion_tokens_details.reasoning_tokens", reasoningTokensResult.Int()) - } - } - - // Process the output array for content and function calls - outputResult := responseResult.Get("output") - if outputResult.IsArray() { - outputArray := outputResult.Array() - var contentText string - var reasoningText string - var toolCalls []string - - for _, outputItem := range outputArray { - outputType := outputItem.Get("type").String() - - switch outputType { - case "reasoning": - // Extract reasoning content from summary - if summaryResult := outputItem.Get("summary"); summaryResult.IsArray() { - summaryArray := summaryResult.Array() - for _, summaryItem := range summaryArray { - if summaryItem.Get("type").String() == "summary_text" { - reasoningText = summaryItem.Get("text").String() - break - } - } - } - case "message": - // Extract message content - if contentResult := outputItem.Get("content"); contentResult.IsArray() { - contentArray := contentResult.Array() - for _, contentItem := range contentArray { - if contentItem.Get("type").String() == "output_text" { - contentText = contentItem.Get("text").String() - break - } - } - } - case "function_call": - // Handle function call content - functionCallTemplate := `{"id": "","type": "function","function": {"name": "","arguments": ""}}` - - if callIdResult := outputItem.Get("call_id"); callIdResult.Exists() { - functionCallTemplate, _ = sjson.Set(functionCallTemplate, "id", callIdResult.String()) - } - - if nameResult := outputItem.Get("name"); nameResult.Exists() { - n := nameResult.String() - rev := buildReverseMapFromOriginalOpenAI(originalRequestRawJSON) - if orig, ok := rev[n]; ok { - n = orig - } - functionCallTemplate, _ = sjson.Set(functionCallTemplate, "function.name", n) - } - - if argsResult := outputItem.Get("arguments"); argsResult.Exists() { - functionCallTemplate, _ = sjson.Set(functionCallTemplate, "function.arguments", argsResult.String()) - } - - toolCalls = append(toolCalls, functionCallTemplate) - } - } - - // Set content and reasoning content if found - if contentText != "" { - template, _ = sjson.Set(template, "choices.0.message.content", contentText) - template, _ = sjson.Set(template, "choices.0.message.role", "assistant") - } - - if reasoningText != "" { - template, _ = sjson.Set(template, "choices.0.message.reasoning_content", reasoningText) - template, _ = sjson.Set(template, "choices.0.message.role", "assistant") - } - - // Add tool calls if any - if len(toolCalls) > 0 { - template, _ = sjson.SetRaw(template, "choices.0.message.tool_calls", `[]`) - for _, toolCall := range toolCalls { - template, _ = sjson.SetRaw(template, "choices.0.message.tool_calls.-1", toolCall) - } - template, _ = sjson.Set(template, "choices.0.message.role", "assistant") - } - } - - // Extract and set the finish reason based on status - if statusResult := responseResult.Get("status"); statusResult.Exists() { - status := statusResult.String() - if status == "completed" { - template, _ = sjson.Set(template, "choices.0.finish_reason", "stop") - template, _ = sjson.Set(template, "choices.0.native_finish_reason", "stop") - } - } - - return template + rootResult := gjson.ParseBytes(rawJSON) + // Verify this is a response.completed event + if rootResult.Get("type").String() != "response.completed" { + return "" } - return "" + + unixTimestamp := time.Now().Unix() + + responseResult := rootResult.Get("response") + + template := `{"id":"","object":"chat.completion","created":123456,"model":"model","choices":[{"index":0,"message":{"role":"assistant","content":null,"reasoning_content":null,"tool_calls":null},"finish_reason":null,"native_finish_reason":null}]}` + + // Extract and set the model version. + if modelResult := responseResult.Get("model"); modelResult.Exists() { + template, _ = sjson.Set(template, "model", modelResult.String()) + } + + // Extract and set the creation timestamp. + if createdAtResult := responseResult.Get("created_at"); createdAtResult.Exists() { + template, _ = sjson.Set(template, "created", createdAtResult.Int()) + } else { + template, _ = sjson.Set(template, "created", unixTimestamp) + } + + // Extract and set the response ID. + if idResult := responseResult.Get("id"); idResult.Exists() { + template, _ = sjson.Set(template, "id", idResult.String()) + } + + // Extract and set usage metadata (token counts). + if usageResult := responseResult.Get("usage"); usageResult.Exists() { + if outputTokensResult := usageResult.Get("output_tokens"); outputTokensResult.Exists() { + template, _ = sjson.Set(template, "usage.completion_tokens", outputTokensResult.Int()) + } + if totalTokensResult := usageResult.Get("total_tokens"); totalTokensResult.Exists() { + template, _ = sjson.Set(template, "usage.total_tokens", totalTokensResult.Int()) + } + if inputTokensResult := usageResult.Get("input_tokens"); inputTokensResult.Exists() { + template, _ = sjson.Set(template, "usage.prompt_tokens", inputTokensResult.Int()) + } + if reasoningTokensResult := usageResult.Get("output_tokens_details.reasoning_tokens"); reasoningTokensResult.Exists() { + template, _ = sjson.Set(template, "usage.completion_tokens_details.reasoning_tokens", reasoningTokensResult.Int()) + } + } + + // Process the output array for content and function calls + outputResult := responseResult.Get("output") + if outputResult.IsArray() { + outputArray := outputResult.Array() + var contentText string + var reasoningText string + var toolCalls []string + + for _, outputItem := range outputArray { + outputType := outputItem.Get("type").String() + + switch outputType { + case "reasoning": + // Extract reasoning content from summary + if summaryResult := outputItem.Get("summary"); summaryResult.IsArray() { + summaryArray := summaryResult.Array() + for _, summaryItem := range summaryArray { + if summaryItem.Get("type").String() == "summary_text" { + reasoningText = summaryItem.Get("text").String() + break + } + } + } + case "message": + // Extract message content + if contentResult := outputItem.Get("content"); contentResult.IsArray() { + contentArray := contentResult.Array() + for _, contentItem := range contentArray { + if contentItem.Get("type").String() == "output_text" { + contentText = contentItem.Get("text").String() + break + } + } + } + case "function_call": + // Handle function call content + functionCallTemplate := `{"id": "","type": "function","function": {"name": "","arguments": ""}}` + + if callIdResult := outputItem.Get("call_id"); callIdResult.Exists() { + functionCallTemplate, _ = sjson.Set(functionCallTemplate, "id", callIdResult.String()) + } + + if nameResult := outputItem.Get("name"); nameResult.Exists() { + n := nameResult.String() + rev := buildReverseMapFromOriginalOpenAI(originalRequestRawJSON) + if orig, ok := rev[n]; ok { + n = orig + } + functionCallTemplate, _ = sjson.Set(functionCallTemplate, "function.name", n) + } + + if argsResult := outputItem.Get("arguments"); argsResult.Exists() { + functionCallTemplate, _ = sjson.Set(functionCallTemplate, "function.arguments", argsResult.String()) + } + + toolCalls = append(toolCalls, functionCallTemplate) + } + } + + // Set content and reasoning content if found + if contentText != "" { + template, _ = sjson.Set(template, "choices.0.message.content", contentText) + template, _ = sjson.Set(template, "choices.0.message.role", "assistant") + } + + if reasoningText != "" { + template, _ = sjson.Set(template, "choices.0.message.reasoning_content", reasoningText) + template, _ = sjson.Set(template, "choices.0.message.role", "assistant") + } + + // Add tool calls if any + if len(toolCalls) > 0 { + template, _ = sjson.SetRaw(template, "choices.0.message.tool_calls", `[]`) + for _, toolCall := range toolCalls { + template, _ = sjson.SetRaw(template, "choices.0.message.tool_calls.-1", toolCall) + } + template, _ = sjson.Set(template, "choices.0.message.role", "assistant") + } + } + + // Extract and set the finish reason based on status + if statusResult := responseResult.Get("status"); statusResult.Exists() { + status := statusResult.String() + if status == "completed" { + template, _ = sjson.Set(template, "choices.0.finish_reason", "stop") + template, _ = sjson.Set(template, "choices.0.native_finish_reason", "stop") + } + } + + return template } // buildReverseMapFromOriginalOpenAI builds a map of shortened tool name -> original tool name diff --git a/internal/usage/logger_plugin.go b/internal/usage/logger_plugin.go new file mode 100644 index 00000000..122ef305 --- /dev/null +++ b/internal/usage/logger_plugin.go @@ -0,0 +1,26 @@ +package usage + +import ( + "context" + "encoding/json" + + coreusage "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage" + log "github.com/sirupsen/logrus" +) + +func init() { + coreusage.RegisterPlugin(NewLoggerPlugin()) +} + +// LoggerPlugin outputs every usage record to the application log. +type LoggerPlugin struct{} + +// NewLoggerPlugin constructs a new logger plugin instance. +func NewLoggerPlugin() *LoggerPlugin { return &LoggerPlugin{} } + +// HandleUsage implements coreusage.Plugin. +func (p *LoggerPlugin) HandleUsage(ctx context.Context, record coreusage.Record) { + // Output all relevant fields for observability; keep logging lightweight and non-blocking. + data, _ := json.Marshal(record) + log.Debug(string(data)) +} diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index 9dbeaedc..9d6a34d5 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -14,12 +14,14 @@ import ( "github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" "github.com/router-for-me/CLIProxyAPI/v6/internal/runtime/executor" + _ "github.com/router-for-me/CLIProxyAPI/v6/internal/usage" "github.com/router-for-me/CLIProxyAPI/v6/internal/util" "github.com/router-for-me/CLIProxyAPI/v6/internal/watcher" sdkaccess "github.com/router-for-me/CLIProxyAPI/v6/sdk/access" _ "github.com/router-for-me/CLIProxyAPI/v6/sdk/access/providers/configapikey" sdkAuth "github.com/router-for-me/CLIProxyAPI/v6/sdk/auth" coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage" log "github.com/sirupsen/logrus" ) @@ -51,6 +53,11 @@ type Service struct { shutdownOnce sync.Once } +// RegisterUsagePlugin registers a usage plugin on the global usage manager. +func (s *Service) RegisterUsagePlugin(plugin usage.Plugin) { + usage.RegisterPlugin(plugin) +} + func newDefaultAuthManager() *sdkAuth.Manager { return sdkAuth.NewManager( sdkAuth.NewFileTokenStore(), @@ -217,6 +224,8 @@ func (s *Service) Run(ctx context.Context) error { ctx = context.Background() } + usage.StartDefault(ctx) + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) defer shutdownCancel() defer func() { @@ -388,6 +397,8 @@ func (s *Service) Shutdown(ctx context.Context) error { } } } + + usage.StopDefault() }) return shutdownErr } diff --git a/sdk/cliproxy/usage/manager.go b/sdk/cliproxy/usage/manager.go new file mode 100644 index 00000000..f97c3598 --- /dev/null +++ b/sdk/cliproxy/usage/manager.go @@ -0,0 +1,182 @@ +package usage + +import ( + "context" + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +// Record contains the usage statistics captured for a single provider request. +type Record struct { + Provider string + Model string + APIKey string + AuthID string + RequestedAt time.Time + Detail Detail +} + +// Detail holds the token usage breakdown. +type Detail struct { + InputTokens int64 + OutputTokens int64 + ReasoningTokens int64 + CachedTokens int64 + TotalTokens int64 +} + +// Plugin consumes usage records emitted by the proxy runtime. +type Plugin interface { + HandleUsage(ctx context.Context, record Record) +} + +type queueItem struct { + ctx context.Context + record Record +} + +// Manager maintains a queue of usage records and delivers them to registered plugins. +type Manager struct { + once sync.Once + stopOnce sync.Once + cancel context.CancelFunc + queue chan queueItem + + pluginsMu sync.RWMutex + plugins []Plugin +} + +// NewManager constructs a manager with a buffered queue. +func NewManager(buffer int) *Manager { + if buffer <= 0 { + buffer = 256 + } + return &Manager{queue: make(chan queueItem, buffer)} +} + +// Start launches the background dispatcher. Calling Start multiple times is safe. +func (m *Manager) Start(ctx context.Context) { + if m == nil { + return + } + m.once.Do(func() { + if ctx == nil { + ctx = context.Background() + } + var workerCtx context.Context + workerCtx, m.cancel = context.WithCancel(ctx) + go m.run(workerCtx) + }) +} + +// Stop stops the dispatcher and drains the queue. +func (m *Manager) Stop() { + if m == nil { + return + } + m.stopOnce.Do(func() { + if m.cancel != nil { + m.cancel() + } + close(m.queue) + }) +} + +// Register appends a plugin to the delivery list. +func (m *Manager) Register(plugin Plugin) { + if m == nil || plugin == nil { + return + } + m.pluginsMu.Lock() + m.plugins = append(m.plugins, plugin) + m.pluginsMu.Unlock() +} + +// Publish enqueues a usage record for processing. If no plugin is registered +// the record will be discarded downstream. +func (m *Manager) Publish(ctx context.Context, record Record) { + if m == nil { + return + } + // ensure worker is running even if Start was not called explicitly + m.Start(context.Background()) + select { + case m.queue <- queueItem{ctx: ctx, record: record}: + default: + // queue is full; drop the record to avoid blocking runtime paths + log.Debugf("usage: queue full, dropping record for provider %s", record.Provider) + } +} + +func (m *Manager) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + m.drain() + return + case item, ok := <-m.queue: + if !ok { + return + } + m.dispatch(item) + } + } +} + +func (m *Manager) drain() { + for { + select { + case item, ok := <-m.queue: + if !ok { + return + } + m.dispatch(item) + default: + return + } + } +} + +func (m *Manager) dispatch(item queueItem) { + m.pluginsMu.RLock() + plugins := make([]Plugin, len(m.plugins)) + copy(plugins, m.plugins) + m.pluginsMu.RUnlock() + if len(plugins) == 0 { + return + } + for _, plugin := range plugins { + if plugin == nil { + continue + } + safeInvoke(plugin, item.ctx, item.record) + } +} + +func safeInvoke(plugin Plugin, ctx context.Context, record Record) { + defer func() { + if r := recover(); r != nil { + log.Errorf("usage: plugin panic recovered: %v", r) + } + }() + plugin.HandleUsage(ctx, record) +} + +var defaultManager = NewManager(512) + +// DefaultManager returns the global usage manager instance. +func DefaultManager() *Manager { return defaultManager } + +// RegisterPlugin registers a plugin on the default manager. +func RegisterPlugin(plugin Plugin) { DefaultManager().Register(plugin) } + +// PublishRecord publishes a record using the default manager. +func PublishRecord(ctx context.Context, record Record) { DefaultManager().Publish(ctx, record) } + +// StartDefault starts the default manager's dispatcher. +func StartDefault(ctx context.Context) { DefaultManager().Start(ctx) } + +// StopDefault stops the default manager's dispatcher. +func StopDefault() { DefaultManager().Stop() }