package executor import ( "bufio" "bytes" "context" "fmt" "io" "net/http" "strings" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" log "github.com/sirupsen/logrus" ) // OpenAICompatExecutor implements a stateless executor for OpenAI-compatible providers. // It performs request/response translation and executes against the provider base URL // using per-auth credentials (API key) and per-auth HTTP transport (proxy) from context. type OpenAICompatExecutor struct { provider string cfg *config.Config } // NewOpenAICompatExecutor creates an executor bound to a provider key (e.g., "openrouter"). func NewOpenAICompatExecutor(provider string, cfg *config.Config) *OpenAICompatExecutor { return &OpenAICompatExecutor{provider: provider, cfg: cfg} } // Identifier implements cliproxyauth.ProviderExecutor. func (e *OpenAICompatExecutor) Identifier() string { return e.provider } // PrepareRequest is a no-op for now (credentials are added via headers at execution time). func (e *OpenAICompatExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { baseURL, apiKey := e.resolveCredentials(auth) if baseURL == "" || apiKey == "" { return cliproxyexecutor.Response{}, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL or apiKey"} } // Translate inbound request to OpenAI format from := opts.SourceFormat to := sdktranslator.FromString("openai") translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), opts.Stream) url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" recordAPIRequest(ctx, e.cfg, translated) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated)) if err != nil { return cliproxyexecutor.Response{}, err } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Authorization", "Bearer "+apiKey) httpReq.Header.Set("User-Agent", "cli-proxy-openai-compat") httpClient := &http.Client{} if rt, ok := ctx.Value("cliproxy.roundtripper").(http.RoundTripper); ok && rt != nil { httpClient.Transport = rt } resp, err := httpClient.Do(httpReq) if err != nil { return cliproxyexecutor.Response{}, err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode < 200 || resp.StatusCode >= 300 { b, _ := io.ReadAll(resp.Body) appendAPIResponseChunk(ctx, e.cfg, b) return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} } body, err := io.ReadAll(resp.Body) if err != nil { return cliproxyexecutor.Response{}, err } appendAPIResponseChunk(ctx, e.cfg, body) // Translate response back to source format when needed var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, ¶m) return cliproxyexecutor.Response{Payload: []byte(out)}, nil } func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { baseURL, apiKey := e.resolveCredentials(auth) if baseURL == "" || apiKey == "" { return nil, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL or apiKey"} } from := opts.SourceFormat to := sdktranslator.FromString("openai") translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" recordAPIRequest(ctx, e.cfg, translated) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated)) if err != nil { return nil, err } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Authorization", "Bearer "+apiKey) httpReq.Header.Set("User-Agent", "cli-proxy-openai-compat") httpReq.Header.Set("Accept", "text/event-stream") httpReq.Header.Set("Cache-Control", "no-cache") httpClient := &http.Client{Timeout: 0} if rt, ok := ctx.Value("cliproxy.roundtripper").(http.RoundTripper); ok && rt != nil { httpClient.Transport = rt } resp, err := httpClient.Do(httpReq) if err != nil { return nil, err } if resp.StatusCode < 200 || resp.StatusCode >= 300 { defer func() { _ = resp.Body.Close() }() b, _ := io.ReadAll(resp.Body) appendAPIResponseChunk(ctx, e.cfg, b) return nil, statusErr{code: resp.StatusCode, msg: string(b)} } out := make(chan cliproxyexecutor.StreamChunk) go func() { defer close(out) defer func() { _ = resp.Body.Close() }() scanner := bufio.NewScanner(resp.Body) buf := make([]byte, 1024*1024) scanner.Buffer(buf, 1024*1024) var param any for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) if len(line) == 0 { continue } // OpenAI-compatible streams are SSE: lines typically prefixed with "data: ". // Pass through translator; it yields one or more chunks for the target schema. chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(line), ¶m) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } if err = scanner.Err(); err != nil { out <- cliproxyexecutor.StreamChunk{Err: err} } }() return out, nil } // Refresh is a no-op for API-key based compatibility providers. func (e *OpenAICompatExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { log.Debugf("openai compat executor: refresh called") _ = ctx return auth, nil } func (e *OpenAICompatExecutor) resolveCredentials(auth *cliproxyauth.Auth) (baseURL, apiKey string) { if auth == nil { return "", "" } if auth.Attributes != nil { baseURL = auth.Attributes["base_url"] apiKey = auth.Attributes["api_key"] } return } type statusErr struct { code int msg string } func (e statusErr) Error() string { if e.msg != "" { return e.msg } return fmt.Sprintf("status %d", e.code) } func (e statusErr) StatusCode() int { return e.code }