diff --git a/internal/api/handlers/management/usage.go b/internal/api/handlers/management/usage.go index 37a2d97b..e9c7dd20 100644 --- a/internal/api/handlers/management/usage.go +++ b/internal/api/handlers/management/usage.go @@ -13,5 +13,8 @@ func (h *Handler) GetUsageStatistics(c *gin.Context) { if h != nil && h.usageStats != nil { snapshot = h.usageStats.Snapshot() } - c.JSON(http.StatusOK, gin.H{"usage": snapshot}) + c.JSON(http.StatusOK, gin.H{ + "usage": snapshot, + "failed_requests": snapshot.FailureCount, + }) } diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 2939df1e..77cfc1ea 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -36,13 +36,14 @@ func (e *ClaudeExecutor) Identifier() string { return "claude" } func (e *ClaudeExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } -func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { +func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { apiKey, baseURL := claudeCreds(auth) if baseURL == "" { baseURL = "https://api.anthropic.com" } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("claude") // Use streaming translation to preserve function calling, except for claude. @@ -56,7 +57,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { - return cliproxyexecutor.Response{}, err + return resp, err } applyClaudeHeaders(httpReq, apiKey, false) var authID, authLabel, authType, authValue string @@ -78,30 +79,31 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } defer func() { - if errClose := resp.Body.Close(); errClose != nil { + if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } }() - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - b, _ := io.ReadAll(resp.Body) + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) - return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) + err = statusErr{code: httpResp.StatusCode, msg: string(b)} + return resp, err } - reader := io.Reader(resp.Body) + reader := io.Reader(httpResp.Body) var decoder *zstd.Decoder - if hasZSTDEcoding(resp.Header.Get("Content-Encoding")) { - decoder, err = zstd.NewReader(resp.Body) + if hasZSTDEcoding(httpResp.Header.Get("Content-Encoding")) { + decoder, err = zstd.NewReader(httpResp.Body) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, fmt.Errorf("failed to initialize zstd decoder: %w", err) + return resp, fmt.Errorf("failed to initialize zstd decoder: %w", err) } reader = decoder defer decoder.Close() @@ -109,7 +111,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r data, err := io.ReadAll(reader) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) if stream { @@ -124,16 +126,18 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r } var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) - return cliproxyexecutor.Response{Payload: []byte(out)}, nil + resp = cliproxyexecutor.Response{Payload: []byte(out)} + return resp, nil } -func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { +func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { apiKey, baseURL := claudeCreds(auth) if baseURL == "" { baseURL = "https://api.anthropic.com" } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("claude") body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) @@ -164,27 +168,35 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - defer func() { _ = resp.Body.Close() }() - b, _ := io.ReadAll(resp.Body) + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) - return nil, statusErr{code: resp.StatusCode, msg: string(b)} + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("response body close error: %v", errClose) + } + err = statusErr{code: httpResp.StatusCode, msg: string(b)} + return nil, err } out := make(chan cliproxyexecutor.StreamChunk) + stream = out go func() { defer close(out) - defer func() { _ = resp.Body.Close() }() + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("response body close error: %v", errClose) + } + }() // If from == to (Claude → Claude), directly forward the SSE stream without translation if from == to { - scanner := bufio.NewScanner(resp.Body) + scanner := bufio.NewScanner(httpResp.Body) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) for scanner.Scan() { @@ -199,15 +211,16 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A cloned[len(line)] = '\n' out <- cliproxyexecutor.StreamChunk{Payload: cloned} } - if err = scanner.Err(); err != nil { - recordAPIResponseError(ctx, e.cfg, err) - out <- cliproxyexecutor.StreamChunk{Err: err} + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} } return } // For other formats, use translation - scanner := bufio.NewScanner(resp.Body) + scanner := bufio.NewScanner(httpResp.Body) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) var param any @@ -222,12 +235,13 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } - if err = scanner.Err(); err != nil { - recordAPIResponseError(ctx, e.cfg, err) - out <- cliproxyexecutor.StreamChunk{Err: err} + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() - return out, nil + return stream, nil } func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index a68af42f..7fe27874 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -39,13 +39,14 @@ func (e *CodexExecutor) Identifier() string { return "codex" } func (e *CodexExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } -func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { +func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { apiKey, baseURL := codexCreds(auth) if baseURL == "" { baseURL = "https://chatgpt.com/backend-api/codex" } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("codex") @@ -101,7 +102,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re url := strings.TrimSuffix(baseURL, "/") + "/responses" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { - return cliproxyexecutor.Response{}, err + return resp, err } applyCodexHeaders(httpReq, auth, apiKey) var authID, authLabel, authType, authValue string @@ -125,23 +126,28 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } - defer func() { _ = resp.Body.Close() }() - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - b, _ := io.ReadAll(resp.Body) + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("codex executor: close response body error: %v", errClose) + } + }() + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) - return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) + err = statusErr{code: httpResp.StatusCode, msg: string(b)} + return resp, err } - data, err := io.ReadAll(resp.Body) + data, err := io.ReadAll(httpResp.Body) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) @@ -162,18 +168,21 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, line, ¶m) - return cliproxyexecutor.Response{Payload: []byte(out)}, nil + resp = cliproxyexecutor.Response{Payload: []byte(out)} + return resp, nil } - return cliproxyexecutor.Response{}, statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"} + err = statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"} + return resp, err } -func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { +func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { apiKey, baseURL := codexCreds(auth) if baseURL == "" { baseURL = "https://chatgpt.com/backend-api/codex" } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("codex") @@ -253,28 +262,36 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - defer func() { _ = resp.Body.Close() }() - b, readErr := io.ReadAll(resp.Body) + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + data, readErr := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("codex executor: close response body error: %v", errClose) + } if readErr != nil { recordAPIResponseError(ctx, e.cfg, readErr) return nil, readErr } - appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) - return nil, statusErr{code: resp.StatusCode, msg: string(b)} + appendAPIResponseChunk(ctx, e.cfg, data) + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data)) + err = statusErr{code: httpResp.StatusCode, msg: string(data)} + return nil, err } out := make(chan cliproxyexecutor.StreamChunk) + stream = out go func() { defer close(out) - defer func() { _ = resp.Body.Close() }() - scanner := bufio.NewScanner(resp.Body) + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("codex executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(httpResp.Body) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) var param any @@ -296,12 +313,13 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } - if err = scanner.Err(); err != nil { - recordAPIResponseError(ctx, e.cfg, err) - out <- cliproxyexecutor.StreamChunk{Err: err} + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() - return out, nil + return stream, nil } func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { diff --git a/internal/runtime/executor/gemini_cli_executor.go b/internal/runtime/executor/gemini_cli_executor.go index 3ab0b4cf..d8691706 100644 --- a/internal/runtime/executor/gemini_cli_executor.go +++ b/internal/runtime/executor/gemini_cli_executor.go @@ -51,12 +51,13 @@ func (e *GeminiCLIExecutor) Identifier() string { return "gemini-cli" } func (e *GeminiCLIExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } -func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { +func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth) if err != nil { - return cliproxyexecutor.Response{}, err + return resp, err } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("gemini-cli") @@ -104,7 +105,8 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth tok, errTok := tokenSource.Token() if errTok != nil { - return cliproxyexecutor.Response{}, errTok + err = errTok + return resp, err } updateGeminiCLITokenMetadata(auth, baseTokenData, tok) @@ -115,7 +117,8 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload)) if errReq != nil { - return cliproxyexecutor.Response{}, errReq + err = errReq + return resp, err } reqHTTP.Header.Set("Content-Type", "application/json") reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken) @@ -133,44 +136,60 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth AuthValue: authValue, }) - resp, errDo := httpClient.Do(reqHTTP) + httpResp, errDo := httpClient.Do(reqHTTP) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) - return cliproxyexecutor.Response{}, errDo + err = errDo + return resp, err } - data, errRead := io.ReadAll(resp.Body) - _ = resp.Body.Close() - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) + + data, errRead := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("gemini cli executor: close response body error: %v", errClose) + } + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) - return cliproxyexecutor.Response{}, errRead + err = errRead + return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) - if resp.StatusCode >= 200 && resp.StatusCode < 300 { + if httpResp.StatusCode >= 200 && httpResp.StatusCode < 300 { reporter.publish(ctx, parseGeminiCLIUsage(data)) var param any out := sdktranslator.TranslateNonStream(respCtx, to, from, attemptModel, bytes.Clone(opts.OriginalRequest), payload, data, ¶m) - return cliproxyexecutor.Response{Payload: []byte(out)}, nil + resp = cliproxyexecutor.Response{Payload: []byte(out)} + return resp, nil } - lastStatus = resp.StatusCode + + lastStatus = httpResp.StatusCode lastBody = append([]byte(nil), data...) - if resp.StatusCode != 429 { - break + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data)) + if httpResp.StatusCode == 429 { + continue } + + err = statusErr{code: httpResp.StatusCode, msg: string(data)} + return resp, err } if len(lastBody) > 0 { appendAPIResponseChunk(ctx, e.cfg, lastBody) } - return cliproxyexecutor.Response{}, statusErr{code: lastStatus, msg: string(lastBody)} + if lastStatus == 0 { + lastStatus = 429 + } + err = statusErr{code: lastStatus, msg: string(lastBody)} + return resp, err } -func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { +func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth) if err != nil { return nil, err } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("gemini-cli") @@ -207,7 +226,8 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut tok, errTok := tokenSource.Token() if errTok != nil { - return nil, errTok + err = errTok + return nil, err } updateGeminiCLITokenMetadata(auth, baseTokenData, tok) @@ -220,7 +240,8 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload)) if errReq != nil { - return nil, errReq + err = errReq + return nil, err } reqHTTP.Header.Set("Content-Type", "application/json") reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken) @@ -238,33 +259,43 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut AuthValue: authValue, }) - resp, errDo := httpClient.Do(reqHTTP) + httpResp, errDo := httpClient.Do(reqHTTP) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) - return nil, errDo + err = errDo + return nil, err } - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - data, errRead := io.ReadAll(resp.Body) - _ = resp.Body.Close() + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + data, errRead := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("gemini cli executor: close response body error: %v", errClose) + } if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) - return nil, errRead + err = errRead + return nil, err } appendAPIResponseChunk(ctx, e.cfg, data) - lastStatus = resp.StatusCode + lastStatus = httpResp.StatusCode lastBody = append([]byte(nil), data...) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(data)) - if resp.StatusCode == 429 { + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data)) + if httpResp.StatusCode == 429 { continue } - return nil, statusErr{code: resp.StatusCode, msg: string(data)} + err = statusErr{code: httpResp.StatusCode, msg: string(data)} + return nil, err } out := make(chan cliproxyexecutor.StreamChunk) + stream = out go func(resp *http.Response, reqBody []byte, attempt string) { defer close(out) - defer func() { _ = resp.Body.Close() }() + defer func() { + if errClose := resp.Body.Close(); errClose != nil { + log.Errorf("gemini cli executor: close response body error: %v", errClose) + } + }() if opts.Alt == "" { scanner := bufio.NewScanner(resp.Body) buf := make([]byte, 20_971_520) @@ -290,6 +321,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut } if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} } return @@ -298,6 +330,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut data, errRead := io.ReadAll(resp.Body) if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) + reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errRead} return } @@ -313,15 +346,19 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut for i := range segments { out <- cliproxyexecutor.StreamChunk{Payload: []byte(segments[i])} } - }(resp, append([]byte(nil), payload...), attemptModel) + }(httpResp, append([]byte(nil), payload...), attemptModel) - return out, nil + return stream, nil } + if len(lastBody) > 0 { + appendAPIResponseChunk(ctx, e.cfg, lastBody) + } if lastStatus == 0 { lastStatus = 429 } - return nil, statusErr{code: lastStatus, msg: string(lastBody)} + err = statusErr{code: lastStatus, msg: string(lastBody)} + return nil, err } func (e *GeminiCLIExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { diff --git a/internal/runtime/executor/gemini_executor.go b/internal/runtime/executor/gemini_executor.go index ae01eb37..180f07fb 100644 --- a/internal/runtime/executor/gemini_executor.go +++ b/internal/runtime/executor/gemini_executor.go @@ -68,10 +68,11 @@ func (e *GeminiExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) e // Returns: // - cliproxyexecutor.Response: The response from the API // - error: An error if the request fails -func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { +func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { apiKey, bearer := geminiCreds(auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) // Official Gemini API via API key or OAuth bearer from := opts.SourceFormat @@ -98,7 +99,7 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { - return cliproxyexecutor.Response{}, err + return resp, err } httpReq.Header.Set("Content-Type", "application/json") if apiKey != "" { @@ -125,35 +126,42 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } - defer func() { _ = resp.Body.Close() }() - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - b, _ := io.ReadAll(resp.Body) + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("gemini executor: close response body error: %v", errClose) + } + }() + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) - return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) + err = statusErr{code: httpResp.StatusCode, msg: string(b)} + return resp, err } - data, err := io.ReadAll(resp.Body) + data, err := io.ReadAll(httpResp.Body) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) reporter.publish(ctx, parseGeminiUsage(data)) var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) - return cliproxyexecutor.Response{Payload: []byte(out)}, nil + resp = cliproxyexecutor.Response{Payload: []byte(out)} + return resp, nil } -func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { +func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { apiKey, bearer := geminiCreds(auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("gemini") @@ -202,24 +210,32 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - defer func() { _ = resp.Body.Close() }() - b, _ := io.ReadAll(resp.Body) + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) - return nil, statusErr{code: resp.StatusCode, msg: string(b)} + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("gemini executor: close response body error: %v", errClose) + } + err = statusErr{code: httpResp.StatusCode, msg: string(b)} + return nil, err } out := make(chan cliproxyexecutor.StreamChunk) + stream = out go func() { defer close(out) - defer func() { _ = resp.Body.Close() }() - scanner := bufio.NewScanner(resp.Body) + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("gemini executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(httpResp.Body) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) var param any @@ -238,12 +254,13 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A for i := range lines { out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])} } - if err = scanner.Err(); err != nil { - recordAPIResponseError(ctx, e.cfg, err) - out <- cliproxyexecutor.StreamChunk{Err: err} + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() - return out, nil + return stream, nil } func (e *GeminiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { diff --git a/internal/runtime/executor/iflow_executor.go b/internal/runtime/executor/iflow_executor.go index 2bd38ba1..905594ef 100644 --- a/internal/runtime/executor/iflow_executor.go +++ b/internal/runtime/executor/iflow_executor.go @@ -41,16 +41,18 @@ func (e *IFlowExecutor) Identifier() string { return "iflow" } func (e *IFlowExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } // Execute performs a non-streaming chat completion request. -func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { +func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { apiKey, baseURL := iflowCreds(auth) if strings.TrimSpace(apiKey) == "" { - return cliproxyexecutor.Response{}, fmt.Errorf("iflow executor: missing api key") + err = fmt.Errorf("iflow executor: missing api key") + return resp, err } if baseURL == "" { baseURL = iflowauth.DefaultAPIBaseURL } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("openai") @@ -60,7 +62,7 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) if err != nil { - return cliproxyexecutor.Response{}, err + return resp, err } applyIFlowHeaders(httpReq, apiKey, false) var authID, authLabel, authType, authValue string @@ -82,45 +84,53 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } - defer func() { _ = resp.Body.Close() }() - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("iflow executor: close response body error: %v", errClose) + } + }() + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - b, _ := io.ReadAll(resp.Body) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("iflow request error: status %d body %s", resp.StatusCode, string(b)) - return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} + log.Debugf("iflow request error: status %d body %s", httpResp.StatusCode, string(b)) + err = statusErr{code: httpResp.StatusCode, msg: string(b)} + return resp, err } - data, err := io.ReadAll(resp.Body) + data, err := io.ReadAll(httpResp.Body) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) reporter.publish(ctx, parseOpenAIUsage(data)) var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) - return cliproxyexecutor.Response{Payload: []byte(out)}, nil + resp = cliproxyexecutor.Response{Payload: []byte(out)} + return resp, nil } // ExecuteStream performs a streaming chat completion request. -func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { +func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { apiKey, baseURL := iflowCreds(auth) if strings.TrimSpace(apiKey) == "" { - return nil, fmt.Errorf("iflow executor: missing api key") + err = fmt.Errorf("iflow executor: missing api key") + return nil, err } if baseURL == "" { baseURL = iflowauth.DefaultAPIBaseURL } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("openai") @@ -158,27 +168,35 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - defer func() { _ = resp.Body.Close() }() - b, _ := io.ReadAll(resp.Body) - appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("iflow streaming error: status %d body %s", resp.StatusCode, string(b)) - return nil, statusErr{code: resp.StatusCode, msg: string(b)} + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + data, _ := io.ReadAll(httpResp.Body) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("iflow executor: close response body error: %v", errClose) + } + appendAPIResponseChunk(ctx, e.cfg, data) + log.Debugf("iflow streaming error: status %d body %s", httpResp.StatusCode, string(data)) + err = statusErr{code: httpResp.StatusCode, msg: string(data)} + return nil, err } out := make(chan cliproxyexecutor.StreamChunk) + stream = out go func() { defer close(out) - defer func() { _ = resp.Body.Close() }() + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("iflow executor: close response body error: %v", errClose) + } + }() - scanner := bufio.NewScanner(resp.Body) + scanner := bufio.NewScanner(httpResp.Body) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) var param any @@ -193,13 +211,14 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } - if err := scanner.Err(); err != nil { - recordAPIResponseError(ctx, e.cfg, err) - out <- cliproxyexecutor.StreamChunk{Err: err} + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() - return out, nil + return stream, nil } // CountTokens is not implemented for iFlow. diff --git a/internal/runtime/executor/openai_compat_executor.go b/internal/runtime/executor/openai_compat_executor.go index e5d0aeec..127a2791 100644 --- a/internal/runtime/executor/openai_compat_executor.go +++ b/internal/runtime/executor/openai_compat_executor.go @@ -38,12 +38,15 @@ func (e *OpenAICompatExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.A return nil } -func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { +func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) + baseURL, apiKey := e.resolveCredentials(auth) if baseURL == "" { - return cliproxyexecutor.Response{}, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"} + err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"} + return } - reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) // Translate inbound request to OpenAI format from := opts.SourceFormat @@ -56,7 +59,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated)) if err != nil { - return cliproxyexecutor.Response{}, err + return resp, err } httpReq.Header.Set("Content-Type", "application/json") if apiKey != "" { @@ -82,38 +85,47 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } - defer func() { _ = resp.Body.Close() }() - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - b, _ := io.ReadAll(resp.Body) + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("openai compat executor: close response body error: %v", errClose) + } + }() + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) - return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) + err = statusErr{code: httpResp.StatusCode, msg: string(b)} + return resp, err } - body, err := io.ReadAll(resp.Body) + body, err := io.ReadAll(httpResp.Body) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } appendAPIResponseChunk(ctx, e.cfg, body) reporter.publish(ctx, parseOpenAIUsage(body)) // Translate response back to source format when needed var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, ¶m) - return cliproxyexecutor.Response{Payload: []byte(out)}, nil + resp = cliproxyexecutor.Response{Payload: []byte(out)} + return resp, nil } -func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { +func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { + reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) + baseURL, apiKey := e.resolveCredentials(auth) if baseURL == "" { - return nil, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"} + err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"} + return nil, err } - reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) from := opts.SourceFormat to := sdktranslator.FromString("openai") translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) @@ -152,24 +164,32 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - defer func() { _ = resp.Body.Close() }() - b, _ := io.ReadAll(resp.Body) + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) - return nil, statusErr{code: resp.StatusCode, msg: string(b)} + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("openai compat executor: close response body error: %v", errClose) + } + err = statusErr{code: httpResp.StatusCode, msg: string(b)} + return nil, err } out := make(chan cliproxyexecutor.StreamChunk) + stream = out go func() { defer close(out) - defer func() { _ = resp.Body.Close() }() - scanner := bufio.NewScanner(resp.Body) + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("openai compat executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(httpResp.Body) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) var param any @@ -189,12 +209,13 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } - if err = scanner.Err(); err != nil { - recordAPIResponseError(ctx, e.cfg, err) - out <- cliproxyexecutor.StreamChunk{Err: err} + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() - return out, nil + return stream, nil } func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { diff --git a/internal/runtime/executor/qwen_executor.go b/internal/runtime/executor/qwen_executor.go index e631e615..22e5c5da 100644 --- a/internal/runtime/executor/qwen_executor.go +++ b/internal/runtime/executor/qwen_executor.go @@ -38,13 +38,14 @@ func (e *QwenExecutor) Identifier() string { return "qwen" } func (e *QwenExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } -func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { +func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { token, baseURL := qwenCreds(auth) if baseURL == "" { baseURL = "https://portal.qwen.ai/v1" } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("openai") @@ -53,7 +54,7 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { - return cliproxyexecutor.Response{}, err + return resp, err } applyQwenHeaders(httpReq, token, false) var authID, authLabel, authType, authValue string @@ -75,38 +76,45 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } - defer func() { _ = resp.Body.Close() }() - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - b, _ := io.ReadAll(resp.Body) + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("qwen executor: close response body error: %v", errClose) + } + }() + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) - return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) + err = statusErr{code: httpResp.StatusCode, msg: string(b)} + return resp, err } - data, err := io.ReadAll(resp.Body) + data, err := io.ReadAll(httpResp.Body) if err != nil { recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, err + return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) reporter.publish(ctx, parseOpenAIUsage(data)) var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) - return cliproxyexecutor.Response{Payload: []byte(out)}, nil + resp = cliproxyexecutor.Response{Payload: []byte(out)} + return resp, nil } -func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { +func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { token, baseURL := qwenCreds(auth) if baseURL == "" { baseURL = "https://portal.qwen.ai/v1" } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) + defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("openai") @@ -145,24 +153,32 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - resp, err := httpClient.Do(httpReq) + httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } - recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - defer func() { _ = resp.Body.Close() }() - b, _ := io.ReadAll(resp.Body) + recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) - return nil, statusErr{code: resp.StatusCode, msg: string(b)} + log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("qwen executor: close response body error: %v", errClose) + } + err = statusErr{code: httpResp.StatusCode, msg: string(b)} + return nil, err } out := make(chan cliproxyexecutor.StreamChunk) + stream = out go func() { defer close(out) - defer func() { _ = resp.Body.Close() }() - scanner := bufio.NewScanner(resp.Body) + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("qwen executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(httpResp.Body) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) var param any @@ -177,12 +193,17 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } - if err = scanner.Err(); err != nil { - recordAPIResponseError(ctx, e.cfg, err) - out <- cliproxyexecutor.StreamChunk{Err: err} + doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone([]byte("[DONE]")), ¶m) + for i := range doneChunks { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(doneChunks[i])} + } + if errScan := scanner.Err(); errScan != nil { + recordAPIResponseError(ctx, e.cfg, errScan) + reporter.publishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() - return out, nil + return stream, nil } func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { diff --git a/internal/runtime/executor/usage_helpers.go b/internal/runtime/executor/usage_helpers.go index d27fd524..4e7cab90 100644 --- a/internal/runtime/executor/usage_helpers.go +++ b/internal/runtime/executor/usage_helpers.go @@ -41,6 +41,23 @@ func newUsageReporter(ctx context.Context, provider, model string, auth *cliprox } func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) { + r.publishWithOutcome(ctx, detail, false) +} + +func (r *usageReporter) publishFailure(ctx context.Context) { + r.publishWithOutcome(ctx, usage.Detail{}, true) +} + +func (r *usageReporter) trackFailure(ctx context.Context, errPtr *error) { + if r == nil || errPtr == nil { + return + } + if *errPtr != nil { + r.publishFailure(ctx) + } +} + +func (r *usageReporter) publishWithOutcome(ctx context.Context, detail usage.Detail, failed bool) { if r == nil { return } @@ -50,7 +67,7 @@ func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) { detail.TotalTokens = total } } - if detail.InputTokens == 0 && detail.OutputTokens == 0 && detail.ReasoningTokens == 0 && detail.CachedTokens == 0 && detail.TotalTokens == 0 { + if detail.InputTokens == 0 && detail.OutputTokens == 0 && detail.ReasoningTokens == 0 && detail.CachedTokens == 0 && detail.TotalTokens == 0 && !failed { return } r.once.Do(func() { @@ -61,6 +78,7 @@ func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) { APIKey: r.apiKey, AuthID: r.authID, RequestedAt: r.requestedAt, + Failed: failed, Detail: detail, }) }) diff --git a/internal/usage/logger_plugin.go b/internal/usage/logger_plugin.go index 78bce813..64c61d87 100644 --- a/internal/usage/logger_plugin.go +++ b/internal/usage/logger_plugin.go @@ -91,6 +91,7 @@ type RequestDetail struct { Timestamp time.Time `json:"timestamp"` Source string `json:"source"` Tokens TokenStats `json:"tokens"` + Failed bool `json:"failed"` } // TokenStats captures the token usage breakdown for a request. @@ -165,7 +166,11 @@ func (s *RequestStatistics) Record(ctx context.Context, record coreusage.Record) if statsKey == "" { statsKey = resolveAPIIdentifier(ctx, record) } - success := resolveSuccess(ctx) + failed := record.Failed + if !failed { + failed = !resolveSuccess(ctx) + } + success := !failed modelName := record.Model if modelName == "" { modelName = "unknown" @@ -193,6 +198,7 @@ func (s *RequestStatistics) Record(ctx context.Context, record coreusage.Record) Timestamp: timestamp, Source: record.Source, Tokens: detail, + Failed: failed, }) s.requestsByDay[dayKey]++ diff --git a/sdk/cliproxy/usage/manager.go b/sdk/cliproxy/usage/manager.go index 25efbdc2..9ef73356 100644 --- a/sdk/cliproxy/usage/manager.go +++ b/sdk/cliproxy/usage/manager.go @@ -16,6 +16,7 @@ type Record struct { AuthID string Source string RequestedAt time.Time + Failed bool Detail Detail }