package executor import ( "bufio" "bytes" "context" "fmt" "io" "net/http" "strings" "time" iflowauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/iflow" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/util" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) const ( iflowDefaultEndpoint = "/chat/completions" iflowUserAgent = "iFlow-Cli" ) // IFlowExecutor executes OpenAI-compatible chat completions against the iFlow API using API keys derived from OAuth. type IFlowExecutor struct { cfg *config.Config } // NewIFlowExecutor constructs a new executor instance. func NewIFlowExecutor(cfg *config.Config) *IFlowExecutor { return &IFlowExecutor{cfg: cfg} } // Identifier returns the provider key. func (e *IFlowExecutor) Identifier() string { return "iflow" } // PrepareRequest implements ProviderExecutor but requires no preprocessing. func (e *IFlowExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } // Execute performs a non-streaming chat completion request. func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { apiKey, baseURL := iflowCreds(auth) if strings.TrimSpace(apiKey) == "" { err = fmt.Errorf("iflow executor: missing api key") return resp, err } if baseURL == "" { baseURL = iflowauth.DefaultAPIBaseURL } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("openai") body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false) endpoint := strings.TrimSuffix(baseURL, "/") + iflowDefaultEndpoint httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) if err != nil { return resp, err } applyIFlowHeaders(httpReq, apiKey, false) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: endpoint, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("iflow executor: close response body error: %v", errClose) } }() recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Debugf("iflow request error: status %d body %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } data, err := io.ReadAll(httpResp.Body) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) reporter.publish(ctx, parseOpenAIUsage(data)) var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) resp = cliproxyexecutor.Response{Payload: []byte(out)} return resp, nil } // ExecuteStream performs a streaming chat completion request. func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { apiKey, baseURL := iflowCreds(auth) if strings.TrimSpace(apiKey) == "" { err = fmt.Errorf("iflow executor: missing api key") return nil, err } if baseURL == "" { baseURL = iflowauth.DefaultAPIBaseURL } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("openai") body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) // Ensure tools array exists to avoid provider quirks similar to Qwen's behaviour. toolsResult := gjson.GetBytes(body, "tools") if toolsResult.Exists() && toolsResult.IsArray() && len(toolsResult.Array()) == 0 { body = ensureToolsArray(body) } endpoint := strings.TrimSuffix(baseURL, "/") + iflowDefaultEndpoint httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) if err != nil { return nil, err } applyIFlowHeaders(httpReq, apiKey, true) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: endpoint, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { data, _ := io.ReadAll(httpResp.Body) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("iflow executor: close response body error: %v", errClose) } appendAPIResponseChunk(ctx, e.cfg, data) log.Debugf("iflow streaming error: status %d body %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data)) err = statusErr{code: httpResp.StatusCode, msg: string(data)} return nil, err } out := make(chan cliproxyexecutor.StreamChunk) stream = out go func() { defer close(out) defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("iflow executor: close response body error: %v", errClose) } }() scanner := bufio.NewScanner(httpResp.Body) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) var param any for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) if detail, ok := parseOpenAIStreamUsage(line); ok { reporter.publish(ctx, detail) } chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan) reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() return stream, nil } func (e *IFlowExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { from := opts.SourceFormat to := sdktranslator.FromString("openai") body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false) enc, err := tokenizerForModel(req.Model) if err != nil { return cliproxyexecutor.Response{}, fmt.Errorf("iflow executor: tokenizer init failed: %w", err) } count, err := countOpenAIChatTokens(enc, body) if err != nil { return cliproxyexecutor.Response{}, fmt.Errorf("iflow executor: token counting failed: %w", err) } usageJSON := buildOpenAIUsageJSON(count) translated := sdktranslator.TranslateTokenCount(ctx, to, from, count, usageJSON) return cliproxyexecutor.Response{Payload: []byte(translated)}, nil } // Refresh refreshes OAuth tokens and updates the stored API key. func (e *IFlowExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { log.Debugf("iflow executor: refresh called") if auth == nil { return nil, fmt.Errorf("iflow executor: auth is nil") } refreshToken := "" oldAccessToken := "" if auth.Metadata != nil { if v, ok := auth.Metadata["refresh_token"].(string); ok { refreshToken = strings.TrimSpace(v) } if v, ok := auth.Metadata["access_token"].(string); ok { oldAccessToken = strings.TrimSpace(v) } } if refreshToken == "" { return auth, nil } // Log the old access token (masked) before refresh if oldAccessToken != "" { log.Debugf("iflow executor: refreshing access token, old: %s", util.HideAPIKey(oldAccessToken)) } svc := iflowauth.NewIFlowAuth(e.cfg) tokenData, err := svc.RefreshTokens(ctx, refreshToken) if err != nil { log.Errorf("iflow executor: token refresh failed: %v", err) return nil, err } if auth.Metadata == nil { auth.Metadata = make(map[string]any) } auth.Metadata["access_token"] = tokenData.AccessToken if tokenData.RefreshToken != "" { auth.Metadata["refresh_token"] = tokenData.RefreshToken } if tokenData.APIKey != "" { auth.Metadata["api_key"] = tokenData.APIKey } auth.Metadata["expired"] = tokenData.Expire auth.Metadata["type"] = "iflow" auth.Metadata["last_refresh"] = time.Now().Format(time.RFC3339) // Log the new access token (masked) after successful refresh log.Debugf("iflow executor: token refresh successful, new: %s", util.HideAPIKey(tokenData.AccessToken)) if auth.Attributes == nil { auth.Attributes = make(map[string]string) } if tokenData.APIKey != "" { auth.Attributes["api_key"] = tokenData.APIKey } return auth, nil } func applyIFlowHeaders(r *http.Request, apiKey string, stream bool) { r.Header.Set("Content-Type", "application/json") r.Header.Set("Authorization", "Bearer "+apiKey) r.Header.Set("User-Agent", iflowUserAgent) if stream { r.Header.Set("Accept", "text/event-stream") } else { r.Header.Set("Accept", "application/json") } } func iflowCreds(a *cliproxyauth.Auth) (apiKey, baseURL string) { if a == nil { return "", "" } if a.Attributes != nil { if v := strings.TrimSpace(a.Attributes["api_key"]); v != "" { apiKey = v } if v := strings.TrimSpace(a.Attributes["base_url"]); v != "" { baseURL = v } } if apiKey == "" && a.Metadata != nil { if v, ok := a.Metadata["api_key"].(string); ok { apiKey = strings.TrimSpace(v) } } if baseURL == "" && a.Metadata != nil { if v, ok := a.Metadata["base_url"].(string); ok { baseURL = strings.TrimSpace(v) } } return apiKey, baseURL } func ensureToolsArray(body []byte) []byte { placeholder := `[{"type":"function","function":{"name":"noop","description":"Placeholder tool to stabilise streaming","parameters":{"type":"object"}}}]` updated, err := sjson.SetRawBytes(body, "tools", []byte(placeholder)) if err != nil { return body } return updated }