Refactor executor error handling and usage reporting

- Updated the Execute methods in various executors (GeminiCLIExecutor, GeminiExecutor, IFlowExecutor, OpenAICompatExecutor, QwenExecutor) to return a response and error as named return values for improved clarity.
- Enhanced error handling by deferring failure tracking in usage reporters, ensuring that failures are reported correctly.
- Improved response body handling by ensuring proper closure and error logging for HTTP responses across all executors.
- Added failure tracking and reporting in the usage reporter to capture unsuccessful requests.
- Updated the usage logging structure to include a 'Failed' field for better tracking of request outcomes.
- Adjusted the logic in the RequestStatistics and Record methods to accommodate the new failure tracking mechanism.
This commit is contained in:
Luis Pater
2025-10-21 11:22:24 +08:00
parent 67f553806b
commit 20985d1a10
11 changed files with 390 additions and 215 deletions

View File

@@ -41,16 +41,18 @@ func (e *IFlowExecutor) Identifier() string { return "iflow" }
func (e *IFlowExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
// Execute performs a non-streaming chat completion request.
func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
apiKey, baseURL := iflowCreds(auth)
if strings.TrimSpace(apiKey) == "" {
return cliproxyexecutor.Response{}, fmt.Errorf("iflow executor: missing api key")
err = fmt.Errorf("iflow executor: missing api key")
return resp, err
}
if baseURL == "" {
baseURL = iflowauth.DefaultAPIBaseURL
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
@@ -60,7 +62,7 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
return cliproxyexecutor.Response{}, err
return resp, err
}
applyIFlowHeaders(httpReq, apiKey, false)
var authID, authLabel, authType, authValue string
@@ -82,45 +84,53 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
defer func() { _ = resp.Body.Close() }()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("iflow executor: close response body error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("iflow request error: status %d body %s", resp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("iflow request error: status %d body %s", httpResp.StatusCode, string(b))
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
}
data, err := io.ReadAll(resp.Body)
data, err := io.ReadAll(httpResp.Body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, data)
reporter.publish(ctx, parseOpenAIUsage(data))
var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
}
// ExecuteStream performs a streaming chat completion request.
func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
apiKey, baseURL := iflowCreds(auth)
if strings.TrimSpace(apiKey) == "" {
return nil, fmt.Errorf("iflow executor: missing api key")
err = fmt.Errorf("iflow executor: missing api key")
return nil, err
}
if baseURL == "" {
baseURL = iflowauth.DefaultAPIBaseURL
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
@@ -158,27 +168,35 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }()
b, _ := io.ReadAll(resp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("iflow streaming error: status %d body %s", resp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
data, _ := io.ReadAll(httpResp.Body)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("iflow executor: close response body error: %v", errClose)
}
appendAPIResponseChunk(ctx, e.cfg, data)
log.Debugf("iflow streaming error: status %d body %s", httpResp.StatusCode, string(data))
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() {
defer close(out)
defer func() { _ = resp.Body.Close() }()
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("iflow executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(resp.Body)
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520)
var param any
@@ -193,13 +211,14 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
}
}
if err := scanner.Err(); err != nil {
recordAPIResponseError(ctx, e.cfg, err)
out <- cliproxyexecutor.StreamChunk{Err: err}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
}()
return out, nil
return stream, nil
}
// CountTokens is not implemented for iFlow.