feat(runtime): add Brotli and Zstd compression support, improve response handling

- Implemented Brotli and Zstd decompression handling in `FileRequestLogger` and executor logic for enhanced compatibility.
- Added `decodeResponseBody` utility for streamlined multi-encoding support (Gzip, Deflate, Brotli, Zstd).
- Improved resource cleanup with composite readers for proper closure under all conditions.
- Updated dependencies in `go.mod` and `go.sum` to include Brotli and Zstd libraries.
This commit is contained in:
Luis Pater
2025-10-28 08:39:03 +08:00
parent c7196ba7dc
commit 847c2502a5
4 changed files with 180 additions and 45 deletions

1
go.mod
View File

@@ -28,6 +28,7 @@ require (
cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ProtonMail/go-crypto v1.3.0 // indirect github.com/ProtonMail/go-crypto v1.3.0 // indirect
github.com/andybalholm/brotli v1.0.6 // indirect
github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cloudflare/circl v1.6.1 // indirect github.com/cloudflare/circl v1.6.1 // indirect

2
go.sum
View File

@@ -4,6 +4,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw= github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw=
github.com/ProtonMail/go-crypto v1.3.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE= github.com/ProtonMail/go-crypto v1.3.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE=
github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI=
github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=

View File

@@ -15,6 +15,10 @@ import (
"strings" "strings"
"time" "time"
"github.com/andybalholm/brotli"
"github.com/klauspost/compress/zstd"
log "github.com/sirupsen/logrus"
"github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces" "github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util" "github.com/router-for-me/CLIProxyAPI/v6/internal/util"
) )
@@ -411,6 +415,10 @@ func (l *FileRequestLogger) decompressResponse(responseHeaders map[string][]stri
return l.decompressGzip(response) return l.decompressGzip(response)
case "deflate": case "deflate":
return l.decompressDeflate(response) return l.decompressDeflate(response)
case "br":
return l.decompressBrotli(response)
case "zstd":
return l.decompressZstd(response)
default: default:
// No compression or unsupported compression // No compression or unsupported compression
return response, nil return response, nil
@@ -431,7 +439,9 @@ func (l *FileRequestLogger) decompressGzip(data []byte) ([]byte, error) {
return nil, fmt.Errorf("failed to create gzip reader: %w", err) return nil, fmt.Errorf("failed to create gzip reader: %w", err)
} }
defer func() { defer func() {
_ = reader.Close() if errClose := reader.Close(); errClose != nil {
log.WithError(errClose).Warn("failed to close gzip reader in request logger")
}
}() }()
decompressed, err := io.ReadAll(reader) decompressed, err := io.ReadAll(reader)
@@ -453,7 +463,9 @@ func (l *FileRequestLogger) decompressGzip(data []byte) ([]byte, error) {
func (l *FileRequestLogger) decompressDeflate(data []byte) ([]byte, error) { func (l *FileRequestLogger) decompressDeflate(data []byte) ([]byte, error) {
reader := flate.NewReader(bytes.NewReader(data)) reader := flate.NewReader(bytes.NewReader(data))
defer func() { defer func() {
_ = reader.Close() if errClose := reader.Close(); errClose != nil {
log.WithError(errClose).Warn("failed to close deflate reader in request logger")
}
}() }()
decompressed, err := io.ReadAll(reader) decompressed, err := io.ReadAll(reader)
@@ -464,6 +476,48 @@ func (l *FileRequestLogger) decompressDeflate(data []byte) ([]byte, error) {
return decompressed, nil return decompressed, nil
} }
// decompressBrotli decompresses brotli-encoded data.
//
// Parameters:
// - data: The brotli-encoded data to decompress
//
// Returns:
// - []byte: The decompressed data
// - error: An error if decompression fails, nil otherwise
func (l *FileRequestLogger) decompressBrotli(data []byte) ([]byte, error) {
reader := brotli.NewReader(bytes.NewReader(data))
decompressed, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("failed to decompress brotli data: %w", err)
}
return decompressed, nil
}
// decompressZstd decompresses zstd-encoded data.
//
// Parameters:
// - data: The zstd-encoded data to decompress
//
// Returns:
// - []byte: The decompressed data
// - error: An error if decompression fails, nil otherwise
func (l *FileRequestLogger) decompressZstd(data []byte) ([]byte, error) {
decoder, err := zstd.NewReader(bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("failed to create zstd reader: %w", err)
}
defer decoder.Close()
decompressed, err := io.ReadAll(decoder)
if err != nil {
return nil, fmt.Errorf("failed to decompress zstd data: %w", err)
}
return decompressed, nil
}
// formatRequestInfo creates the request information section of the log. // formatRequestInfo creates the request information section of the log.
// //
// Parameters: // Parameters:

View File

@@ -3,6 +3,8 @@ package executor
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"compress/flate"
"compress/gzip"
"context" "context"
"fmt" "fmt"
"io" "io"
@@ -10,6 +12,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/andybalholm/brotli"
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
claudeauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/claude" claudeauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/claude"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/config"
@@ -89,31 +92,31 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return resp, err return resp, err
} }
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body) b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
err = statusErr{code: httpResp.StatusCode, msg: string(b)} err = statusErr{code: httpResp.StatusCode, msg: string(b)}
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
return resp, err return resp, err
} }
reader := io.Reader(httpResp.Body) decodedBody, err := decodeResponseBody(httpResp.Body, httpResp.Header.Get("Content-Encoding"))
var decoder *zstd.Decoder
if hasZSTDEcoding(httpResp.Header.Get("Content-Encoding")) {
decoder, err = zstd.NewReader(httpResp.Body)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return resp, fmt.Errorf("failed to initialize zstd decoder: %w", err) if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
} }
reader = decoder return resp, err
defer decoder.Close()
} }
data, err := io.ReadAll(reader) defer func() {
if errClose := decodedBody.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
}()
data, err := io.ReadAll(decodedBody)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return resp, err return resp, err
@@ -192,19 +195,27 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
err = statusErr{code: httpResp.StatusCode, msg: string(b)} err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err return nil, err
} }
decodedBody, err := decodeResponseBody(httpResp.Body, httpResp.Header.Get("Content-Encoding"))
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk) out := make(chan cliproxyexecutor.StreamChunk)
stream = out stream = out
go func() { go func() {
defer close(out) defer close(out)
defer func() { defer func() {
if errClose := httpResp.Body.Close(); errClose != nil { if errClose := decodedBody.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose) log.Errorf("response body close error: %v", errClose)
} }
}() }()
// If from == to (Claude → Claude), directly forward the SSE stream without translation // If from == to (Claude → Claude), directly forward the SSE stream without translation
if from == to { if from == to {
scanner := bufio.NewScanner(httpResp.Body) scanner := bufio.NewScanner(decodedBody)
buf := make([]byte, 20_971_520) buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520) scanner.Buffer(buf, 20_971_520)
for scanner.Scan() { for scanner.Scan() {
@@ -228,7 +239,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
} }
// For other formats, use translation // For other formats, use translation
scanner := bufio.NewScanner(httpResp.Body) scanner := bufio.NewScanner(decodedBody)
buf := make([]byte, 20_971_520) buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520) scanner.Buffer(buf, 20_971_520)
var param any var param any
@@ -304,29 +315,29 @@ func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Aut
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return cliproxyexecutor.Response{}, err
} }
defer func() {
if errClose := resp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body) b, _ := io.ReadAll(resp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
if errClose := resp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
} }
reader := io.Reader(resp.Body) decodedBody, err := decodeResponseBody(resp.Body, resp.Header.Get("Content-Encoding"))
var decoder *zstd.Decoder
if hasZSTDEcoding(resp.Header.Get("Content-Encoding")) {
decoder, err = zstd.NewReader(resp.Body)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, fmt.Errorf("failed to initialize zstd decoder: %w", err) if errClose := resp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
} }
reader = decoder return cliproxyexecutor.Response{}, err
defer decoder.Close()
} }
data, err := io.ReadAll(reader) defer func() {
if errClose := decodedBody.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
}()
data, err := io.ReadAll(decodedBody)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return cliproxyexecutor.Response{}, err
@@ -419,7 +430,7 @@ func (e *ClaudeExecutor) resolveClaudeConfig(auth *cliproxyauth.Auth) *config.Cl
continue continue
} }
if attrKey != "" && strings.EqualFold(cfgKey, attrKey) { if attrKey != "" && strings.EqualFold(cfgKey, attrKey) {
if attrBase == "" || cfgBase == "" || strings.EqualFold(cfgBase, attrBase) { if cfgBase == "" || strings.EqualFold(cfgBase, attrBase) {
return entry return entry
} }
} }
@@ -438,17 +449,84 @@ func (e *ClaudeExecutor) resolveClaudeConfig(auth *cliproxyauth.Auth) *config.Cl
return nil return nil
} }
func hasZSTDEcoding(contentEncoding string) bool { type compositeReadCloser struct {
io.Reader
closers []func() error
}
func (c *compositeReadCloser) Close() error {
var firstErr error
for i := range c.closers {
if c.closers[i] == nil {
continue
}
if err := c.closers[i](); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
func decodeResponseBody(body io.ReadCloser, contentEncoding string) (io.ReadCloser, error) {
if body == nil {
return nil, fmt.Errorf("response body is nil")
}
if contentEncoding == "" { if contentEncoding == "" {
return false return body, nil
} }
parts := strings.Split(contentEncoding, ",") encodings := strings.Split(contentEncoding, ",")
for i := range parts { for _, raw := range encodings {
if strings.EqualFold(strings.TrimSpace(parts[i]), "zstd") { encoding := strings.TrimSpace(strings.ToLower(raw))
return true switch encoding {
case "", "identity":
continue
case "gzip":
gzipReader, err := gzip.NewReader(body)
if err != nil {
_ = body.Close()
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
}
return &compositeReadCloser{
Reader: gzipReader,
closers: []func() error{
gzipReader.Close,
func() error { return body.Close() },
},
}, nil
case "deflate":
deflateReader := flate.NewReader(body)
return &compositeReadCloser{
Reader: deflateReader,
closers: []func() error{
deflateReader.Close,
func() error { return body.Close() },
},
}, nil
case "br":
return &compositeReadCloser{
Reader: brotli.NewReader(body),
closers: []func() error{
func() error { return body.Close() },
},
}, nil
case "zstd":
decoder, err := zstd.NewReader(body)
if err != nil {
_ = body.Close()
return nil, fmt.Errorf("failed to create zstd reader: %w", err)
}
return &compositeReadCloser{
Reader: decoder,
closers: []func() error{
func() error { decoder.Close(); return nil },
func() error { return body.Close() },
},
}, nil
default:
continue
} }
} }
return false return body, nil
} }
func applyClaudeHeaders(r *http.Request, apiKey string, stream bool) { func applyClaudeHeaders(r *http.Request, apiKey string, stream bool) {