Files
CLIProxyAPI/internal/runtime/executor/kimi_executor.go
Luis Pater 7e9d0db6aa Merge pull request #1467 from dusty-du/fix/kimi-toolcall-reasoning-content
Fix Kimi tool-call payload normalization for reasoning_content
2026-02-07 09:35:04 +08:00

619 lines
19 KiB
Go

package executor
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"time"
kimiauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/kimi"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/thinking"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
// KimiExecutor is a stateless executor for Kimi API using OpenAI-compatible chat completions.
type KimiExecutor struct {
ClaudeExecutor
cfg *config.Config
}
// NewKimiExecutor creates a new Kimi executor.
func NewKimiExecutor(cfg *config.Config) *KimiExecutor { return &KimiExecutor{cfg: cfg} }
// Identifier returns the executor identifier.
func (e *KimiExecutor) Identifier() string { return "kimi" }
// PrepareRequest injects Kimi credentials into the outgoing HTTP request.
func (e *KimiExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error {
if req == nil {
return nil
}
token := kimiCreds(auth)
if strings.TrimSpace(token) != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
return nil
}
// HttpRequest injects Kimi credentials into the request and executes it.
func (e *KimiExecutor) HttpRequest(ctx context.Context, auth *cliproxyauth.Auth, req *http.Request) (*http.Response, error) {
if req == nil {
return nil, fmt.Errorf("kimi executor: request is nil")
}
if ctx == nil {
ctx = req.Context()
}
httpReq := req.WithContext(ctx)
if err := e.PrepareRequest(httpReq, auth); err != nil {
return nil, err
}
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
return httpClient.Do(httpReq)
}
// Execute performs a non-streaming chat completion request to Kimi.
func (e *KimiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
from := opts.SourceFormat
if from.String() == "claude" {
auth.Attributes["base_url"] = kimiauth.KimiAPIBaseURL
return e.ClaudeExecutor.Execute(ctx, auth, req, opts)
}
baseModel := thinking.ParseSuffix(req.Model).ModelName
token := kimiCreds(auth)
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
defer reporter.trackFailure(ctx, &err)
to := sdktranslator.FromString("openai")
originalPayloadSource := req.Payload
if len(opts.OriginalRequest) > 0 {
originalPayloadSource = opts.OriginalRequest
}
originalPayload := bytes.Clone(originalPayloadSource)
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, false)
body := sdktranslator.TranslateRequest(from, to, baseModel, bytes.Clone(req.Payload), false)
// Strip kimi- prefix for upstream API
upstreamModel := stripKimiPrefix(baseModel)
body, err = sjson.SetBytes(body, "model", upstreamModel)
if err != nil {
return resp, fmt.Errorf("kimi executor: failed to set model in payload: %w", err)
}
body, err = thinking.ApplyThinking(body, req.Model, from.String(), "kimi", e.Identifier())
if err != nil {
return resp, err
}
requestedModel := payloadRequestedModel(opts, req.Model)
body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel)
body, err = normalizeKimiToolMessageLinks(body)
if err != nil {
return resp, err
}
url := kimiauth.KimiAPIBaseURL + "/v1/chat/completions"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return resp, err
}
applyKimiHeadersWithAuth(httpReq, token, false, auth)
var authID, authLabel, authType, authValue string
if auth != nil {
authID = auth.ID
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: url,
Method: http.MethodPost,
Headers: httpReq.Header.Clone(),
Body: body,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return resp, err
}
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("kimi executor: close response body error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
}
data, err := io.ReadAll(httpResp.Body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, data)
reporter.publish(ctx, parseOpenAIUsage(data))
var param any
// Note: TranslateNonStream uses req.Model (original with suffix) to preserve
// the original model name in the response for client compatibility.
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, &param)
resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
}
// ExecuteStream performs a streaming chat completion request to Kimi.
func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
from := opts.SourceFormat
if from.String() == "claude" {
auth.Attributes["base_url"] = kimiauth.KimiAPIBaseURL
return e.ClaudeExecutor.ExecuteStream(ctx, auth, req, opts)
}
baseModel := thinking.ParseSuffix(req.Model).ModelName
token := kimiCreds(auth)
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
defer reporter.trackFailure(ctx, &err)
to := sdktranslator.FromString("openai")
originalPayloadSource := req.Payload
if len(opts.OriginalRequest) > 0 {
originalPayloadSource = opts.OriginalRequest
}
originalPayload := bytes.Clone(originalPayloadSource)
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, true)
body := sdktranslator.TranslateRequest(from, to, baseModel, bytes.Clone(req.Payload), true)
// Strip kimi- prefix for upstream API
upstreamModel := stripKimiPrefix(baseModel)
body, err = sjson.SetBytes(body, "model", upstreamModel)
if err != nil {
return nil, fmt.Errorf("kimi executor: failed to set model in payload: %w", err)
}
body, err = thinking.ApplyThinking(body, req.Model, from.String(), "kimi", e.Identifier())
if err != nil {
return nil, err
}
body, err = sjson.SetBytes(body, "stream_options.include_usage", true)
if err != nil {
return nil, fmt.Errorf("kimi executor: failed to set stream_options in payload: %w", err)
}
requestedModel := payloadRequestedModel(opts, req.Model)
body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel)
body, err = normalizeKimiToolMessageLinks(body)
if err != nil {
return nil, err
}
url := kimiauth.KimiAPIBaseURL + "/v1/chat/completions"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return nil, err
}
applyKimiHeadersWithAuth(httpReq, token, true, auth)
var authID, authLabel, authType, authValue string
if auth != nil {
authID = auth.ID
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: url,
Method: http.MethodPost,
Headers: httpReq.Header.Clone(),
Body: body,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("kimi executor: close response body error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() {
defer close(out)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("kimi executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
scanner.Buffer(nil, 1_048_576) // 1MB
var param any
for scanner.Scan() {
line := scanner.Bytes()
appendAPIResponseChunk(ctx, e.cfg, line)
if detail, ok := parseOpenAIStreamUsage(line); ok {
reporter.publish(ctx, detail)
}
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), &param)
for i := range chunks {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
}
}
doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), &param)
for i := range doneChunks {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(doneChunks[i])}
}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
}()
return stream, nil
}
// CountTokens estimates token count for Kimi requests.
func (e *KimiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
auth.Attributes["base_url"] = kimiauth.KimiAPIBaseURL
return e.ClaudeExecutor.CountTokens(ctx, auth, req, opts)
}
func normalizeKimiToolMessageLinks(body []byte) ([]byte, error) {
if len(body) == 0 || !gjson.ValidBytes(body) {
return body, nil
}
messages := gjson.GetBytes(body, "messages")
if !messages.Exists() || !messages.IsArray() {
return body, nil
}
out := body
pending := make([]string, 0)
patched := 0
patchedReasoning := 0
ambiguous := 0
latestReasoning := ""
hasLatestReasoning := false
removePending := func(id string) {
for idx := range pending {
if pending[idx] != id {
continue
}
pending = append(pending[:idx], pending[idx+1:]...)
return
}
}
msgs := messages.Array()
for msgIdx := range msgs {
msg := msgs[msgIdx]
role := strings.TrimSpace(msg.Get("role").String())
switch role {
case "assistant":
reasoning := msg.Get("reasoning_content")
if reasoning.Exists() {
reasoningText := reasoning.String()
if strings.TrimSpace(reasoningText) != "" {
latestReasoning = reasoningText
hasLatestReasoning = true
}
}
toolCalls := msg.Get("tool_calls")
if !toolCalls.Exists() || !toolCalls.IsArray() || len(toolCalls.Array()) == 0 {
continue
}
if !reasoning.Exists() || strings.TrimSpace(reasoning.String()) == "" {
reasoningText := fallbackAssistantReasoning(msg, hasLatestReasoning, latestReasoning)
path := fmt.Sprintf("messages.%d.reasoning_content", msgIdx)
next, err := sjson.SetBytes(out, path, reasoningText)
if err != nil {
return body, fmt.Errorf("kimi executor: failed to set assistant reasoning_content: %w", err)
}
out = next
patchedReasoning++
}
for _, tc := range toolCalls.Array() {
id := strings.TrimSpace(tc.Get("id").String())
if id == "" {
continue
}
pending = append(pending, id)
}
case "tool":
toolCallID := strings.TrimSpace(msg.Get("tool_call_id").String())
if toolCallID == "" {
toolCallID = strings.TrimSpace(msg.Get("call_id").String())
if toolCallID != "" {
path := fmt.Sprintf("messages.%d.tool_call_id", msgIdx)
next, err := sjson.SetBytes(out, path, toolCallID)
if err != nil {
return body, fmt.Errorf("kimi executor: failed to set tool_call_id from call_id: %w", err)
}
out = next
patched++
}
}
if toolCallID == "" {
if len(pending) == 1 {
toolCallID = pending[0]
path := fmt.Sprintf("messages.%d.tool_call_id", msgIdx)
next, err := sjson.SetBytes(out, path, toolCallID)
if err != nil {
return body, fmt.Errorf("kimi executor: failed to infer tool_call_id: %w", err)
}
out = next
patched++
} else if len(pending) > 1 {
ambiguous++
}
}
if toolCallID != "" {
removePending(toolCallID)
}
}
}
if patched > 0 || patchedReasoning > 0 {
log.WithFields(log.Fields{
"patched_tool_messages": patched,
"patched_reasoning_messages": patchedReasoning,
}).Debug("kimi executor: normalized tool message fields")
}
if ambiguous > 0 {
log.WithFields(log.Fields{
"ambiguous_tool_messages": ambiguous,
"pending_tool_calls": len(pending),
}).Warn("kimi executor: tool messages missing tool_call_id with ambiguous candidates")
}
return out, nil
}
func fallbackAssistantReasoning(msg gjson.Result, hasLatest bool, latest string) string {
if hasLatest && strings.TrimSpace(latest) != "" {
return latest
}
content := msg.Get("content")
if content.Type == gjson.String {
if text := strings.TrimSpace(content.String()); text != "" {
return text
}
}
if content.IsArray() {
parts := make([]string, 0, len(content.Array()))
for _, item := range content.Array() {
text := strings.TrimSpace(item.Get("text").String())
if text == "" {
continue
}
parts = append(parts, text)
}
if len(parts) > 0 {
return strings.Join(parts, "\n")
}
}
return "[reasoning unavailable]"
}
// Refresh refreshes the Kimi token using the refresh token.
func (e *KimiExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) {
log.Debugf("kimi executor: refresh called")
if auth == nil {
return nil, fmt.Errorf("kimi executor: auth is nil")
}
// Expect refresh_token in metadata for OAuth-based accounts
var refreshToken string
if auth.Metadata != nil {
if v, ok := auth.Metadata["refresh_token"].(string); ok && strings.TrimSpace(v) != "" {
refreshToken = v
}
}
if strings.TrimSpace(refreshToken) == "" {
// Nothing to refresh
return auth, nil
}
client := kimiauth.NewDeviceFlowClientWithDeviceID(e.cfg, resolveKimiDeviceID(auth))
td, err := client.RefreshToken(ctx, refreshToken)
if err != nil {
return nil, err
}
if auth.Metadata == nil {
auth.Metadata = make(map[string]any)
}
auth.Metadata["access_token"] = td.AccessToken
if td.RefreshToken != "" {
auth.Metadata["refresh_token"] = td.RefreshToken
}
if td.ExpiresAt > 0 {
exp := time.Unix(td.ExpiresAt, 0).UTC().Format(time.RFC3339)
auth.Metadata["expired"] = exp
}
auth.Metadata["type"] = "kimi"
now := time.Now().Format(time.RFC3339)
auth.Metadata["last_refresh"] = now
return auth, nil
}
// applyKimiHeaders sets required headers for Kimi API requests.
// Headers match kimi-cli client for compatibility.
func applyKimiHeaders(r *http.Request, token string, stream bool) {
r.Header.Set("Content-Type", "application/json")
r.Header.Set("Authorization", "Bearer "+token)
// Match kimi-cli headers exactly
r.Header.Set("User-Agent", "KimiCLI/1.10.6")
r.Header.Set("X-Msh-Platform", "kimi_cli")
r.Header.Set("X-Msh-Version", "1.10.6")
r.Header.Set("X-Msh-Device-Name", getKimiHostname())
r.Header.Set("X-Msh-Device-Model", getKimiDeviceModel())
r.Header.Set("X-Msh-Device-Id", getKimiDeviceID())
if stream {
r.Header.Set("Accept", "text/event-stream")
return
}
r.Header.Set("Accept", "application/json")
}
func resolveKimiDeviceIDFromAuth(auth *cliproxyauth.Auth) string {
if auth == nil || auth.Metadata == nil {
return ""
}
deviceIDRaw, ok := auth.Metadata["device_id"]
if !ok {
return ""
}
deviceID, ok := deviceIDRaw.(string)
if !ok {
return ""
}
return strings.TrimSpace(deviceID)
}
func resolveKimiDeviceIDFromStorage(auth *cliproxyauth.Auth) string {
if auth == nil {
return ""
}
storage, ok := auth.Storage.(*kimiauth.KimiTokenStorage)
if !ok || storage == nil {
return ""
}
return strings.TrimSpace(storage.DeviceID)
}
func resolveKimiDeviceID(auth *cliproxyauth.Auth) string {
deviceID := resolveKimiDeviceIDFromAuth(auth)
if deviceID != "" {
return deviceID
}
return resolveKimiDeviceIDFromStorage(auth)
}
func applyKimiHeadersWithAuth(r *http.Request, token string, stream bool, auth *cliproxyauth.Auth) {
applyKimiHeaders(r, token, stream)
if deviceID := resolveKimiDeviceID(auth); deviceID != "" {
r.Header.Set("X-Msh-Device-Id", deviceID)
}
}
// getKimiHostname returns the machine hostname.
func getKimiHostname() string {
hostname, err := os.Hostname()
if err != nil {
return "unknown"
}
return hostname
}
// getKimiDeviceModel returns a device model string matching kimi-cli format.
func getKimiDeviceModel() string {
return fmt.Sprintf("%s %s", runtime.GOOS, runtime.GOARCH)
}
// getKimiDeviceID returns a stable device ID, matching kimi-cli storage location.
func getKimiDeviceID() string {
homeDir, err := os.UserHomeDir()
if err != nil {
return "cli-proxy-api-device"
}
// Check kimi-cli's device_id location first (platform-specific)
var kimiShareDir string
switch runtime.GOOS {
case "darwin":
kimiShareDir = filepath.Join(homeDir, "Library", "Application Support", "kimi")
case "windows":
appData := os.Getenv("APPDATA")
if appData == "" {
appData = filepath.Join(homeDir, "AppData", "Roaming")
}
kimiShareDir = filepath.Join(appData, "kimi")
default: // linux and other unix-like
kimiShareDir = filepath.Join(homeDir, ".local", "share", "kimi")
}
deviceIDPath := filepath.Join(kimiShareDir, "device_id")
if data, err := os.ReadFile(deviceIDPath); err == nil {
return strings.TrimSpace(string(data))
}
return "cli-proxy-api-device"
}
// kimiCreds extracts the access token from auth.
func kimiCreds(a *cliproxyauth.Auth) (token string) {
if a == nil {
return ""
}
// Check metadata first (OAuth flow stores tokens here)
if a.Metadata != nil {
if v, ok := a.Metadata["access_token"].(string); ok && strings.TrimSpace(v) != "" {
return v
}
}
// Fallback to attributes (API key style)
if a.Attributes != nil {
if v := a.Attributes["access_token"]; v != "" {
return v
}
if v := a.Attributes["api_key"]; v != "" {
return v
}
}
return ""
}
// stripKimiPrefix removes the "kimi-" prefix from model names for the upstream API.
func stripKimiPrefix(model string) string {
model = strings.TrimSpace(model)
if strings.HasPrefix(strings.ToLower(model), "kimi-") {
return model[5:]
}
return model
}