mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-02-03 13:00:52 +08:00
When from==to (Claude→Claude scenario), directly forward SSE stream
line-by-line without invoking TranslateStream. This preserves the
multi-line SSE event structure (event:/data:/blank) and prevents
JSON parsing errors caused by event fragmentation.
Resolves: JSON parsing error when using Claude Code streaming responses
fix: correct SSE event formatting in Handler layer
Remove duplicate newline additions (\n\n) that were breaking SSE event format.
The Executor layer already provides properly formatted SSE chunks with correct
line endings, so the Handler should forward them as-is without modification.
Changes:
- Remove redundant \n\n addition after each chunk
- Add len(chunk) > 0 check before writing
- Format error messages as proper SSE events (event: error\ndata: {...}\n\n)
- Add chunkIdx counter for future debugging needs
This fixes JSON parsing errors caused by malformed SSE event streams.
fix: update comments for clarity in SSE event forwarding
347 lines
12 KiB
Go
347 lines
12 KiB
Go
package executor
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/klauspost/compress/zstd"
|
|
claudeauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/claude"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/misc"
|
|
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
|
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
|
|
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/tidwall/gjson"
|
|
"github.com/tidwall/sjson"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
)
|
|
|
|
// ClaudeExecutor is a stateless executor for Anthropic Claude over the messages API.
|
|
// If api_key is unavailable on auth, it falls back to legacy via ClientAdapter.
|
|
type ClaudeExecutor struct {
|
|
cfg *config.Config
|
|
}
|
|
|
|
func NewClaudeExecutor(cfg *config.Config) *ClaudeExecutor { return &ClaudeExecutor{cfg: cfg} }
|
|
|
|
func (e *ClaudeExecutor) Identifier() string { return "claude" }
|
|
|
|
func (e *ClaudeExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
|
|
|
func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
|
apiKey, baseURL := claudeCreds(auth)
|
|
|
|
if baseURL == "" {
|
|
baseURL = "https://api.anthropic.com"
|
|
}
|
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
|
from := opts.SourceFormat
|
|
to := sdktranslator.FromString("claude")
|
|
// Use streaming translation to preserve function calling, except for claude.
|
|
stream := from != to
|
|
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), stream)
|
|
|
|
if !strings.HasPrefix(req.Model, "claude-3-5-haiku") {
|
|
body, _ = sjson.SetRawBytes(body, "system", []byte(misc.ClaudeCodeInstructions))
|
|
}
|
|
|
|
url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL)
|
|
recordAPIRequest(ctx, e.cfg, body)
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, err
|
|
}
|
|
applyClaudeHeaders(httpReq, apiKey, false)
|
|
|
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
|
resp, err := httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, err
|
|
}
|
|
defer func() {
|
|
if errClose := resp.Body.Close(); errClose != nil {
|
|
log.Errorf("response body close error: %v", errClose)
|
|
}
|
|
}()
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
b, _ := io.ReadAll(resp.Body)
|
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
|
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
|
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
|
|
}
|
|
reader := io.Reader(resp.Body)
|
|
var decoder *zstd.Decoder
|
|
if hasZSTDEcoding(resp.Header.Get("Content-Encoding")) {
|
|
decoder, err = zstd.NewReader(resp.Body)
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, fmt.Errorf("failed to initialize zstd decoder: %w", err)
|
|
}
|
|
reader = decoder
|
|
defer decoder.Close()
|
|
}
|
|
data, err := io.ReadAll(reader)
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, err
|
|
}
|
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
|
if stream {
|
|
lines := bytes.Split(data, []byte("\n"))
|
|
for _, line := range lines {
|
|
if detail, ok := parseClaudeStreamUsage(line); ok {
|
|
reporter.publish(ctx, detail)
|
|
}
|
|
}
|
|
} else {
|
|
reporter.publish(ctx, parseClaudeUsage(data))
|
|
}
|
|
var param any
|
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m)
|
|
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
|
}
|
|
|
|
func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
|
|
apiKey, baseURL := claudeCreds(auth)
|
|
|
|
if baseURL == "" {
|
|
baseURL = "https://api.anthropic.com"
|
|
}
|
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
|
from := opts.SourceFormat
|
|
to := sdktranslator.FromString("claude")
|
|
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
|
|
body, _ = sjson.SetRawBytes(body, "system", []byte(misc.ClaudeCodeInstructions))
|
|
|
|
url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL)
|
|
recordAPIRequest(ctx, e.cfg, body)
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
applyClaudeHeaders(httpReq, apiKey, true)
|
|
|
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
|
resp, err := httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
defer func() { _ = resp.Body.Close() }()
|
|
b, _ := io.ReadAll(resp.Body)
|
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
|
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
|
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
|
|
}
|
|
out := make(chan cliproxyexecutor.StreamChunk)
|
|
go func() {
|
|
defer close(out)
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
// If from == to (Claude → Claude), directly forward the SSE stream without translation
|
|
if from == to {
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
buf := make([]byte, 20_971_520)
|
|
scanner.Buffer(buf, 20_971_520)
|
|
for scanner.Scan() {
|
|
line := scanner.Bytes()
|
|
appendAPIResponseChunk(ctx, e.cfg, line)
|
|
if detail, ok := parseClaudeStreamUsage(line); ok {
|
|
reporter.publish(ctx, detail)
|
|
}
|
|
// Forward the line as-is to preserve SSE format
|
|
cloned := make([]byte, len(line)+1)
|
|
copy(cloned, line)
|
|
cloned[len(line)] = '\n'
|
|
out <- cliproxyexecutor.StreamChunk{Payload: cloned}
|
|
}
|
|
if err = scanner.Err(); err != nil {
|
|
out <- cliproxyexecutor.StreamChunk{Err: err}
|
|
}
|
|
return
|
|
}
|
|
|
|
// For other formats, use translation
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
buf := make([]byte, 20_971_520)
|
|
scanner.Buffer(buf, 20_971_520)
|
|
var param any
|
|
for scanner.Scan() {
|
|
line := scanner.Bytes()
|
|
appendAPIResponseChunk(ctx, e.cfg, line)
|
|
if detail, ok := parseClaudeStreamUsage(line); ok {
|
|
reporter.publish(ctx, detail)
|
|
}
|
|
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m)
|
|
for i := range chunks {
|
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
|
}
|
|
}
|
|
if err = scanner.Err(); err != nil {
|
|
out <- cliproxyexecutor.StreamChunk{Err: err}
|
|
}
|
|
}()
|
|
return out, nil
|
|
}
|
|
|
|
func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
|
apiKey, baseURL := claudeCreds(auth)
|
|
|
|
if baseURL == "" {
|
|
baseURL = "https://api.anthropic.com"
|
|
}
|
|
|
|
from := opts.SourceFormat
|
|
to := sdktranslator.FromString("claude")
|
|
// Use streaming translation to preserve function calling, except for claude.
|
|
stream := from != to
|
|
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), stream)
|
|
|
|
if !strings.HasPrefix(req.Model, "claude-3-5-haiku") {
|
|
body, _ = sjson.SetRawBytes(body, "system", []byte(misc.ClaudeCodeInstructions))
|
|
}
|
|
|
|
url := fmt.Sprintf("%s/v1/messages/count_tokens?beta=true", baseURL)
|
|
recordAPIRequest(ctx, e.cfg, body)
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, err
|
|
}
|
|
applyClaudeHeaders(httpReq, apiKey, false)
|
|
|
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
|
resp, err := httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, err
|
|
}
|
|
defer func() {
|
|
if errClose := resp.Body.Close(); errClose != nil {
|
|
log.Errorf("response body close error: %v", errClose)
|
|
}
|
|
}()
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
b, _ := io.ReadAll(resp.Body)
|
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
|
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
|
|
}
|
|
reader := io.Reader(resp.Body)
|
|
var decoder *zstd.Decoder
|
|
if hasZSTDEcoding(resp.Header.Get("Content-Encoding")) {
|
|
decoder, err = zstd.NewReader(resp.Body)
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, fmt.Errorf("failed to initialize zstd decoder: %w", err)
|
|
}
|
|
reader = decoder
|
|
defer decoder.Close()
|
|
}
|
|
data, err := io.ReadAll(reader)
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, err
|
|
}
|
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
|
count := gjson.GetBytes(data, "input_tokens").Int()
|
|
out := sdktranslator.TranslateTokenCount(ctx, to, from, count, data)
|
|
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
|
}
|
|
|
|
func (e *ClaudeExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) {
|
|
log.Debugf("claude executor: refresh called")
|
|
if auth == nil {
|
|
return nil, fmt.Errorf("claude executor: auth is nil")
|
|
}
|
|
var refreshToken string
|
|
if auth.Metadata != nil {
|
|
if v, ok := auth.Metadata["refresh_token"].(string); ok && v != "" {
|
|
refreshToken = v
|
|
}
|
|
}
|
|
if refreshToken == "" {
|
|
return auth, nil
|
|
}
|
|
svc := claudeauth.NewClaudeAuth(e.cfg)
|
|
td, err := svc.RefreshTokens(ctx, refreshToken)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if auth.Metadata == nil {
|
|
auth.Metadata = make(map[string]any)
|
|
}
|
|
auth.Metadata["access_token"] = td.AccessToken
|
|
if td.RefreshToken != "" {
|
|
auth.Metadata["refresh_token"] = td.RefreshToken
|
|
}
|
|
auth.Metadata["email"] = td.Email
|
|
auth.Metadata["expired"] = td.Expire
|
|
auth.Metadata["type"] = "claude"
|
|
now := time.Now().Format(time.RFC3339)
|
|
auth.Metadata["last_refresh"] = now
|
|
return auth, nil
|
|
}
|
|
|
|
func hasZSTDEcoding(contentEncoding string) bool {
|
|
if contentEncoding == "" {
|
|
return false
|
|
}
|
|
parts := strings.Split(contentEncoding, ",")
|
|
for i := range parts {
|
|
if strings.EqualFold(strings.TrimSpace(parts[i]), "zstd") {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func applyClaudeHeaders(r *http.Request, apiKey string, stream bool) {
|
|
r.Header.Set("Authorization", "Bearer "+apiKey)
|
|
r.Header.Set("Content-Type", "application/json")
|
|
r.Header.Set("Anthropic-Beta", "claude-code-20250219,oauth-2025-04-20,interleaved-thinking-2025-05-14,fine-grained-tool-streaming-2025-05-14")
|
|
|
|
var ginHeaders http.Header
|
|
if ginCtx, ok := r.Context().Value("gin").(*gin.Context); ok && ginCtx != nil && ginCtx.Request != nil {
|
|
ginHeaders = ginCtx.Request.Header
|
|
}
|
|
|
|
misc.EnsureHeader(r.Header, ginHeaders, "Anthropic-Version", "2023-06-01")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "Anthropic-Dangerous-Direct-Browser-Access", "true")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-App", "cli")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Helper-Method", "stream")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Retry-Count", "0")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Runtime-Version", "v24.3.0")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Package-Version", "0.55.1")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Runtime", "node")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Lang", "js")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Arch", "arm64")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Os", "MacOS")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Stainless-Timeout", "60")
|
|
r.Header.Set("Connection", "keep-alive")
|
|
r.Header.Set("User-Agent", "claude-cli/1.0.83 (external, cli)")
|
|
r.Header.Set("Accept-Encoding", "gzip, deflate, br, zstd")
|
|
if stream {
|
|
r.Header.Set("Accept", "text/event-stream")
|
|
return
|
|
}
|
|
r.Header.Set("Accept", "application/json")
|
|
}
|
|
|
|
func claudeCreds(a *cliproxyauth.Auth) (apiKey, baseURL string) {
|
|
if a == nil {
|
|
return "", ""
|
|
}
|
|
if a.Attributes != nil {
|
|
apiKey = a.Attributes["api_key"]
|
|
baseURL = a.Attributes["base_url"]
|
|
}
|
|
if apiKey == "" && a.Metadata != nil {
|
|
if v, ok := a.Metadata["access_token"].(string); ok {
|
|
apiKey = v
|
|
}
|
|
}
|
|
return
|
|
}
|