package executor import ( "bufio" "bytes" "context" "fmt" "io" "net/http" "os" "path/filepath" "runtime" "strings" "time" kimiauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/kimi" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/thinking" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" log "github.com/sirupsen/logrus" "github.com/tidwall/sjson" ) // KimiExecutor is a stateless executor for Kimi API using OpenAI-compatible chat completions. type KimiExecutor struct { ClaudeExecutor cfg *config.Config } // NewKimiExecutor creates a new Kimi executor. func NewKimiExecutor(cfg *config.Config) *KimiExecutor { return &KimiExecutor{cfg: cfg} } // Identifier returns the executor identifier. func (e *KimiExecutor) Identifier() string { return "kimi" } // PrepareRequest injects Kimi credentials into the outgoing HTTP request. func (e *KimiExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error { if req == nil { return nil } token := kimiCreds(auth) if strings.TrimSpace(token) != "" { req.Header.Set("Authorization", "Bearer "+token) } return nil } // HttpRequest injects Kimi credentials into the request and executes it. func (e *KimiExecutor) HttpRequest(ctx context.Context, auth *cliproxyauth.Auth, req *http.Request) (*http.Response, error) { if req == nil { return nil, fmt.Errorf("kimi executor: request is nil") } if ctx == nil { ctx = req.Context() } httpReq := req.WithContext(ctx) if err := e.PrepareRequest(httpReq, auth); err != nil { return nil, err } httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) return httpClient.Do(httpReq) } // Execute performs a non-streaming chat completion request to Kimi. func (e *KimiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { from := opts.SourceFormat if from.String() == "claude" { auth.Attributes["base_url"] = kimiauth.KimiAPIBaseURL return e.ClaudeExecutor.Execute(ctx, auth, req, opts) } baseModel := thinking.ParseSuffix(req.Model).ModelName token := kimiCreds(auth) reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth) defer reporter.trackFailure(ctx, &err) to := sdktranslator.FromString("openai") originalPayload := bytes.Clone(req.Payload) if len(opts.OriginalRequest) > 0 { originalPayload = bytes.Clone(opts.OriginalRequest) } originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, false) body := sdktranslator.TranslateRequest(from, to, baseModel, bytes.Clone(req.Payload), false) // Strip kimi- prefix for upstream API upstreamModel := stripKimiPrefix(baseModel) body, err = sjson.SetBytes(body, "model", upstreamModel) if err != nil { return resp, fmt.Errorf("kimi executor: failed to set model in payload: %w", err) } body, err = thinking.ApplyThinking(body, req.Model, from.String(), "kimi", e.Identifier()) if err != nil { return resp, err } requestedModel := payloadRequestedModel(opts, req.Model) body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel) url := kimiauth.KimiAPIBaseURL + "/v1/chat/completions" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return resp, err } applyKimiHeadersWithAuth(httpReq, token, false, auth) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err } defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("kimi executor: close response body error: %v", errClose) } }() recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} return resp, err } data, err := io.ReadAll(httpResp.Body) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) reporter.publish(ctx, parseOpenAIUsage(data)) var param any // Note: TranslateNonStream uses req.Model (original with suffix) to preserve // the original model name in the response for client compatibility. out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) resp = cliproxyexecutor.Response{Payload: []byte(out)} return resp, nil } // ExecuteStream performs a streaming chat completion request to Kimi. func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { from := opts.SourceFormat if from.String() == "claude" { auth.Attributes["base_url"] = kimiauth.KimiAPIBaseURL return e.ClaudeExecutor.ExecuteStream(ctx, auth, req, opts) } baseModel := thinking.ParseSuffix(req.Model).ModelName token := kimiCreds(auth) reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth) defer reporter.trackFailure(ctx, &err) to := sdktranslator.FromString("openai") originalPayload := bytes.Clone(req.Payload) if len(opts.OriginalRequest) > 0 { originalPayload = bytes.Clone(opts.OriginalRequest) } originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, true) body := sdktranslator.TranslateRequest(from, to, baseModel, bytes.Clone(req.Payload), true) // Strip kimi- prefix for upstream API upstreamModel := stripKimiPrefix(baseModel) body, err = sjson.SetBytes(body, "model", upstreamModel) if err != nil { return nil, fmt.Errorf("kimi executor: failed to set model in payload: %w", err) } body, err = thinking.ApplyThinking(body, req.Model, from.String(), "kimi", e.Identifier()) if err != nil { return nil, err } body, err = sjson.SetBytes(body, "stream_options.include_usage", true) if err != nil { return nil, fmt.Errorf("kimi executor: failed to set stream_options in payload: %w", err) } requestedModel := payloadRequestedModel(opts, req.Model) body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel) url := kimiauth.KimiAPIBaseURL + "/v1/chat/completions" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return nil, err } applyKimiHeadersWithAuth(httpReq, token, true, auth) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("kimi executor: close response body error: %v", errClose) } err = statusErr{code: httpResp.StatusCode, msg: string(b)} return nil, err } out := make(chan cliproxyexecutor.StreamChunk) stream = out go func() { defer close(out) defer func() { if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("kimi executor: close response body error: %v", errClose) } }() scanner := bufio.NewScanner(httpResp.Body) scanner.Buffer(nil, 1_048_576) // 1MB var param any for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) if detail, ok := parseOpenAIStreamUsage(line); ok { reporter.publish(ctx, detail) } chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone([]byte("[DONE]")), ¶m) for i := range doneChunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(doneChunks[i])} } if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan) reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() return stream, nil } // CountTokens estimates token count for Kimi requests. func (e *KimiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { auth.Attributes["base_url"] = kimiauth.KimiAPIBaseURL return e.ClaudeExecutor.CountTokens(ctx, auth, req, opts) } // Refresh refreshes the Kimi token using the refresh token. func (e *KimiExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { log.Debugf("kimi executor: refresh called") if auth == nil { return nil, fmt.Errorf("kimi executor: auth is nil") } // Expect refresh_token in metadata for OAuth-based accounts var refreshToken string if auth.Metadata != nil { if v, ok := auth.Metadata["refresh_token"].(string); ok && strings.TrimSpace(v) != "" { refreshToken = v } } if strings.TrimSpace(refreshToken) == "" { // Nothing to refresh return auth, nil } client := kimiauth.NewDeviceFlowClientWithDeviceID(e.cfg, resolveKimiDeviceID(auth)) td, err := client.RefreshToken(ctx, refreshToken) if err != nil { return nil, err } if auth.Metadata == nil { auth.Metadata = make(map[string]any) } auth.Metadata["access_token"] = td.AccessToken if td.RefreshToken != "" { auth.Metadata["refresh_token"] = td.RefreshToken } if td.ExpiresAt > 0 { exp := time.Unix(td.ExpiresAt, 0).UTC().Format(time.RFC3339) auth.Metadata["expired"] = exp } auth.Metadata["type"] = "kimi" now := time.Now().Format(time.RFC3339) auth.Metadata["last_refresh"] = now return auth, nil } // applyKimiHeaders sets required headers for Kimi API requests. // Headers match kimi-cli client for compatibility. func applyKimiHeaders(r *http.Request, token string, stream bool) { r.Header.Set("Content-Type", "application/json") r.Header.Set("Authorization", "Bearer "+token) // Match kimi-cli headers exactly r.Header.Set("User-Agent", "KimiCLI/1.10.6") r.Header.Set("X-Msh-Platform", "kimi_cli") r.Header.Set("X-Msh-Version", "1.10.6") r.Header.Set("X-Msh-Device-Name", getKimiHostname()) r.Header.Set("X-Msh-Device-Model", getKimiDeviceModel()) r.Header.Set("X-Msh-Device-Id", getKimiDeviceID()) if stream { r.Header.Set("Accept", "text/event-stream") return } r.Header.Set("Accept", "application/json") } func resolveKimiDeviceIDFromAuth(auth *cliproxyauth.Auth) string { if auth == nil || auth.Metadata == nil { return "" } deviceIDRaw, ok := auth.Metadata["device_id"] if !ok { return "" } deviceID, ok := deviceIDRaw.(string) if !ok { return "" } return strings.TrimSpace(deviceID) } func resolveKimiDeviceIDFromStorage(auth *cliproxyauth.Auth) string { if auth == nil { return "" } storage, ok := auth.Storage.(*kimiauth.KimiTokenStorage) if !ok || storage == nil { return "" } return strings.TrimSpace(storage.DeviceID) } func resolveKimiDeviceID(auth *cliproxyauth.Auth) string { deviceID := resolveKimiDeviceIDFromAuth(auth) if deviceID != "" { return deviceID } return resolveKimiDeviceIDFromStorage(auth) } func applyKimiHeadersWithAuth(r *http.Request, token string, stream bool, auth *cliproxyauth.Auth) { applyKimiHeaders(r, token, stream) if deviceID := resolveKimiDeviceID(auth); deviceID != "" { r.Header.Set("X-Msh-Device-Id", deviceID) } } // getKimiHostname returns the machine hostname. func getKimiHostname() string { hostname, err := os.Hostname() if err != nil { return "unknown" } return hostname } // getKimiDeviceModel returns a device model string matching kimi-cli format. func getKimiDeviceModel() string { return fmt.Sprintf("%s %s", runtime.GOOS, runtime.GOARCH) } // getKimiDeviceID returns a stable device ID, matching kimi-cli storage location. func getKimiDeviceID() string { homeDir, err := os.UserHomeDir() if err != nil { return "cli-proxy-api-device" } // Check kimi-cli's device_id location first (platform-specific) var kimiShareDir string switch runtime.GOOS { case "darwin": kimiShareDir = filepath.Join(homeDir, "Library", "Application Support", "kimi") case "windows": appData := os.Getenv("APPDATA") if appData == "" { appData = filepath.Join(homeDir, "AppData", "Roaming") } kimiShareDir = filepath.Join(appData, "kimi") default: // linux and other unix-like kimiShareDir = filepath.Join(homeDir, ".local", "share", "kimi") } deviceIDPath := filepath.Join(kimiShareDir, "device_id") if data, err := os.ReadFile(deviceIDPath); err == nil { return strings.TrimSpace(string(data)) } return "cli-proxy-api-device" } // kimiCreds extracts the access token from auth. func kimiCreds(a *cliproxyauth.Auth) (token string) { if a == nil { return "" } // Check metadata first (OAuth flow stores tokens here) if a.Metadata != nil { if v, ok := a.Metadata["access_token"].(string); ok && strings.TrimSpace(v) != "" { return v } } // Fallback to attributes (API key style) if a.Attributes != nil { if v := a.Attributes["access_token"]; v != "" { return v } if v := a.Attributes["api_key"]; v != "" { return v } } return "" } // stripKimiPrefix removes the "kimi-" prefix from model names for the upstream API. func stripKimiPrefix(model string) string { model = strings.TrimSpace(model) if strings.HasPrefix(strings.ToLower(model), "kimi-") { return model[5:] } return model }