package executor import ( "bufio" "bytes" "compress/flate" "compress/gzip" "context" "fmt" "io" "net/http" "strings" "time" "github.com/andybalholm/brotli" "github.com/klauspost/compress/zstd" claudeauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/claude" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/misc" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/tidwall/sjson" "github.com/gin-gonic/gin" ) // ClaudeExecutor is a stateless executor for Anthropic Claude over the messages API. // If api_key is unavailable on auth, it falls back to legacy via ClientAdapter. type ClaudeExecutor struct { cfg *config.Config } func NewClaudeExecutor(cfg *config.Config) *ClaudeExecutor { return &ClaudeExecutor{cfg: cfg} } func (e *ClaudeExecutor) Identifier() string { return "claude" } func (e *ClaudeExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { apiKey, baseURL := claudeCreds(auth) if baseURL == "" { baseURL = "https://api.anthropic.com" } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("claude") // Use streaming translation to preserve function calling, except for claude. stream := from != to body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), stream) modelForUpstream := req.Model if modelOverride := e.resolveUpstreamModel(req.Model, auth); modelOverride != "" { body, _ = sjson.SetBytes(body, "model", modelOverride) modelForUpstream = modelOverride } if !strings.HasPrefix(modelForUpstream, "claude-3-5-haiku") { body, _ = sjson.SetRawBytes(body, "system", []byte(misc.ClaudeCodeInstructions)) } url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return resp, err } applyClaudeHeaders(httpReq, apiKey, false) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err } recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } return resp, err } decodedBody, err := decodeResponseBody(httpResp.Body, httpResp.Header.Get("Content-Encoding")) if err != nil { recordAPIResponseError(ctx, e.cfg, err) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } return resp, err } defer func() { if errClose := decodedBody.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } }() data, err := io.ReadAll(decodedBody) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err } appendAPIResponseChunk(ctx, e.cfg, data) if stream { lines := bytes.Split(data, []byte("\n")) for _, line := range lines { if detail, ok := parseClaudeStreamUsage(line); ok { reporter.publish(ctx, detail) } } } else { reporter.publish(ctx, parseClaudeUsage(data)) } var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) resp = cliproxyexecutor.Response{Payload: []byte(out)} return resp, nil } func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { apiKey, baseURL := claudeCreds(auth) if baseURL == "" { baseURL = "https://api.anthropic.com" } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) defer reporter.trackFailure(ctx, &err) from := opts.SourceFormat to := sdktranslator.FromString("claude") body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) if modelOverride := e.resolveUpstreamModel(req.Model, auth); modelOverride != "" { body, _ = sjson.SetBytes(body, "model", modelOverride) } body, _ = sjson.SetRawBytes(body, "system", []byte(misc.ClaudeCodeInstructions)) url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return nil, err } applyClaudeHeaders(httpReq, apiKey, true) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpResp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return nil, err } recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } err = statusErr{code: httpResp.StatusCode, msg: string(b)} return nil, err } decodedBody, err := decodeResponseBody(httpResp.Body, httpResp.Header.Get("Content-Encoding")) if err != nil { recordAPIResponseError(ctx, e.cfg, err) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } return nil, err } out := make(chan cliproxyexecutor.StreamChunk) stream = out go func() { defer close(out) defer func() { if errClose := decodedBody.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } }() // If from == to (Claude → Claude), directly forward the SSE stream without translation if from == to { scanner := bufio.NewScanner(decodedBody) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) if detail, ok := parseClaudeStreamUsage(line); ok { reporter.publish(ctx, detail) } // Forward the line as-is to preserve SSE format cloned := make([]byte, len(line)+1) copy(cloned, line) cloned[len(line)] = '\n' out <- cliproxyexecutor.StreamChunk{Payload: cloned} } if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan) reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} } return } // For other formats, use translation scanner := bufio.NewScanner(decodedBody) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) var param any for scanner.Scan() { line := scanner.Bytes() appendAPIResponseChunk(ctx, e.cfg, line) if detail, ok := parseClaudeStreamUsage(line); ok { reporter.publish(ctx, detail) } chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } } if errScan := scanner.Err(); errScan != nil { recordAPIResponseError(ctx, e.cfg, errScan) reporter.publishFailure(ctx) out <- cliproxyexecutor.StreamChunk{Err: errScan} } }() return stream, nil } func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { apiKey, baseURL := claudeCreds(auth) if baseURL == "" { baseURL = "https://api.anthropic.com" } from := opts.SourceFormat to := sdktranslator.FromString("claude") // Use streaming translation to preserve function calling, except for claude. stream := from != to body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), stream) modelForUpstream := req.Model if modelOverride := e.resolveUpstreamModel(req.Model, auth); modelOverride != "" { body, _ = sjson.SetBytes(body, "model", modelOverride) modelForUpstream = modelOverride } if !strings.HasPrefix(modelForUpstream, "claude-3-5-haiku") { body, _ = sjson.SetRawBytes(body, "system", []byte(misc.ClaudeCodeInstructions)) } url := fmt.Sprintf("%s/v1/messages/count_tokens?beta=true", baseURL) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return cliproxyexecutor.Response{}, err } applyClaudeHeaders(httpReq, apiKey, false) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), Body: body, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, AuthType: authType, AuthValue: authValue, }) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) resp, err := httpClient.Do(httpReq) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return cliproxyexecutor.Response{}, err } recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) if resp.StatusCode < 200 || resp.StatusCode >= 300 { b, _ := io.ReadAll(resp.Body) appendAPIResponseChunk(ctx, e.cfg, b) if errClose := resp.Body.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} } decodedBody, err := decodeResponseBody(resp.Body, resp.Header.Get("Content-Encoding")) if err != nil { recordAPIResponseError(ctx, e.cfg, err) if errClose := resp.Body.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } return cliproxyexecutor.Response{}, err } defer func() { if errClose := decodedBody.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } }() data, err := io.ReadAll(decodedBody) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return cliproxyexecutor.Response{}, err } appendAPIResponseChunk(ctx, e.cfg, data) count := gjson.GetBytes(data, "input_tokens").Int() out := sdktranslator.TranslateTokenCount(ctx, to, from, count, data) return cliproxyexecutor.Response{Payload: []byte(out)}, nil } func (e *ClaudeExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { log.Debugf("claude executor: refresh called") if auth == nil { return nil, fmt.Errorf("claude executor: auth is nil") } var refreshToken string if auth.Metadata != nil { if v, ok := auth.Metadata["refresh_token"].(string); ok && v != "" { refreshToken = v } } if refreshToken == "" { return auth, nil } svc := claudeauth.NewClaudeAuth(e.cfg) td, err := svc.RefreshTokens(ctx, refreshToken) if err != nil { return nil, err } if auth.Metadata == nil { auth.Metadata = make(map[string]any) } auth.Metadata["access_token"] = td.AccessToken if td.RefreshToken != "" { auth.Metadata["refresh_token"] = td.RefreshToken } auth.Metadata["email"] = td.Email auth.Metadata["expired"] = td.Expire auth.Metadata["type"] = "claude" now := time.Now().Format(time.RFC3339) auth.Metadata["last_refresh"] = now return auth, nil } func (e *ClaudeExecutor) resolveUpstreamModel(alias string, auth *cliproxyauth.Auth) string { if alias == "" { return "" } entry := e.resolveClaudeConfig(auth) if entry == nil { return "" } for i := range entry.Models { model := entry.Models[i] name := strings.TrimSpace(model.Name) modelAlias := strings.TrimSpace(model.Alias) if modelAlias != "" { if strings.EqualFold(modelAlias, alias) { if name != "" { return name } return alias } continue } if name != "" && strings.EqualFold(name, alias) { return name } } return "" } func (e *ClaudeExecutor) resolveClaudeConfig(auth *cliproxyauth.Auth) *config.ClaudeKey { if auth == nil || e.cfg == nil { return nil } var attrKey, attrBase string if auth.Attributes != nil { attrKey = strings.TrimSpace(auth.Attributes["api_key"]) attrBase = strings.TrimSpace(auth.Attributes["base_url"]) } for i := range e.cfg.ClaudeKey { entry := &e.cfg.ClaudeKey[i] cfgKey := strings.TrimSpace(entry.APIKey) cfgBase := strings.TrimSpace(entry.BaseURL) if attrKey != "" && attrBase != "" { if strings.EqualFold(cfgKey, attrKey) && strings.EqualFold(cfgBase, attrBase) { return entry } continue } if attrKey != "" && strings.EqualFold(cfgKey, attrKey) { if cfgBase == "" || strings.EqualFold(cfgBase, attrBase) { return entry } } if attrKey == "" && attrBase != "" && strings.EqualFold(cfgBase, attrBase) { return entry } } if attrKey != "" { for i := range e.cfg.ClaudeKey { entry := &e.cfg.ClaudeKey[i] if strings.EqualFold(strings.TrimSpace(entry.APIKey), attrKey) { return entry } } } return nil } type compositeReadCloser struct { io.Reader closers []func() error } func (c *compositeReadCloser) Close() error { var firstErr error for i := range c.closers { if c.closers[i] == nil { continue } if err := c.closers[i](); err != nil && firstErr == nil { firstErr = err } } return firstErr } func decodeResponseBody(body io.ReadCloser, contentEncoding string) (io.ReadCloser, error) { if body == nil { return nil, fmt.Errorf("response body is nil") } if contentEncoding == "" { return body, nil } encodings := strings.Split(contentEncoding, ",") for _, raw := range encodings { encoding := strings.TrimSpace(strings.ToLower(raw)) switch encoding { case "", "identity": continue case "gzip": gzipReader, err := gzip.NewReader(body) if err != nil { _ = body.Close() return nil, fmt.Errorf("failed to create gzip reader: %w", err) } return &compositeReadCloser{ Reader: gzipReader, closers: []func() error{ gzipReader.Close, func() error { return body.Close() }, }, }, nil case "deflate": deflateReader := flate.NewReader(body) return &compositeReadCloser{ Reader: deflateReader, closers: []func() error{ deflateReader.Close, func() error { return body.Close() }, }, }, nil case "br": return &compositeReadCloser{ Reader: brotli.NewReader(body), closers: []func() error{ func() error { return body.Close() }, }, }, nil case "zstd": decoder, err := zstd.NewReader(body) if err != nil { _ = body.Close() return nil, fmt.Errorf("failed to create zstd reader: %w", err) } return &compositeReadCloser{ Reader: decoder, closers: []func() error{ func() error { decoder.Close(); return nil }, func() error { return body.Close() }, }, }, nil default: continue } } return body, nil } func applyClaudeHeaders(r *http.Request, apiKey string, stream bool) { r.Header.Set("Authorization", "Bearer "+apiKey) r.Header.Set("Content-Type", "application/json") var ginHeaders http.Header if ginCtx, ok := r.Context().Value("gin").(*gin.Context); ok && ginCtx != nil && ginCtx.Request != nil { ginHeaders = ginCtx.Request.Header } misc.EnsureHeader(r.Header, ginHeaders, "Anthropic-Beta", "claude-code-20250219,oauth-2025-04-20,interleaved-thinking-2025-05-14,fine-grained-tool-streaming-2025-05-14") misc.EnsureHeader(r.Header, ginHeaders, "Anthropic-Version", "2023-06-01") misc.EnsureHeader(r.Header, ginHeaders, "Anthropic-Dangerous-Direct-Browser-Access", "true") misc.EnsureHeader(r.Header, ginHeaders, "X-App", "cli") misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Helper-Method", "stream") misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Retry-Count", "0") misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Runtime-Version", "v24.3.0") misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Package-Version", "0.55.1") misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Runtime", "node") misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Lang", "js") misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Arch", "arm64") misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Os", "MacOS") misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Timeout", "60") misc.EnsureHeader(r.Header, ginHeaders, "User-Agent", "claude-cli/1.0.83 (external, cli)") r.Header.Set("Connection", "keep-alive") r.Header.Set("Accept-Encoding", "gzip, deflate, br, zstd") if stream { r.Header.Set("Accept", "text/event-stream") return } r.Header.Set("Accept", "application/json") } func claudeCreds(a *cliproxyauth.Auth) (apiKey, baseURL string) { if a == nil { return "", "" } if a.Attributes != nil { apiKey = a.Attributes["api_key"] baseURL = a.Attributes["base_url"] } if apiKey == "" && a.Metadata != nil { if v, ok := a.Metadata["access_token"].(string); ok { apiKey = v } } return }