v6 version first commit

This commit is contained in:
Luis Pater
2025-09-22 01:40:24 +08:00
parent d42384cdb7
commit 4999fce7f4
171 changed files with 7626 additions and 7494 deletions

View File

@@ -5,12 +5,16 @@ package handlers
import (
"fmt"
"sync"
"net/http"
"github.com/gin-gonic/gin"
"github.com/luispater/CLIProxyAPI/v5/internal/config"
"github.com/luispater/CLIProxyAPI/v5/internal/interfaces"
"github.com/luispater/CLIProxyAPI/v5/internal/util"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces"
"github.com/router-for-me/CLIProxyAPI/v6/internal/runtime/executor"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
coreexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
"golang.org/x/net/context"
)
@@ -38,18 +42,11 @@ type ErrorDetail struct {
// It holds a pool of clients to interact with the backend service and manages
// load balancing, client selection, and configuration.
type BaseAPIHandler struct {
// CliClients is the pool of available AI service clients.
CliClients []interfaces.Client
// AuthManager manages auth lifecycle and execution in the new architecture.
AuthManager *coreauth.Manager
// Cfg holds the current application configuration.
Cfg *config.Config
// Mutex ensures thread-safe access to shared resources.
Mutex *sync.Mutex
// LastUsedClientIndex tracks the last used client index for each provider
// to implement round-robin load balancing.
LastUsedClientIndex map[string]int
}
// NewBaseAPIHandlers creates a new API handlers instance.
@@ -61,12 +58,10 @@ type BaseAPIHandler struct {
//
// Returns:
// - *BaseAPIHandler: A new API handlers instance
func NewBaseAPIHandlers(cliClients []interfaces.Client, cfg *config.Config) *BaseAPIHandler {
func NewBaseAPIHandlers(cfg *config.Config, authManager *coreauth.Manager) *BaseAPIHandler {
return &BaseAPIHandler{
CliClients: cliClients,
Cfg: cfg,
Mutex: &sync.Mutex{},
LastUsedClientIndex: make(map[string]int),
Cfg: cfg,
AuthManager: authManager,
}
}
@@ -76,86 +71,7 @@ func NewBaseAPIHandlers(cliClients []interfaces.Client, cfg *config.Config) *Bas
// Parameters:
// - clients: The new slice of AI service clients
// - cfg: The new application configuration
func (h *BaseAPIHandler) UpdateClients(clients []interfaces.Client, cfg *config.Config) {
h.CliClients = clients
h.Cfg = cfg
}
// GetClient returns an available client from the pool using round-robin load balancing.
// It checks for quota limits and tries to find an unlocked client for immediate use.
// The modelName parameter is used to check quota status for specific models.
//
// Parameters:
// - modelName: The name of the model to be used
// - isGenerateContent: Optional parameter to indicate if this is for content generation
//
// Returns:
// - client.Client: An available client for the requested model
// - *client.ErrorMessage: An error message if no client is available
func (h *BaseAPIHandler) GetClient(modelName string, isGenerateContent ...bool) (interfaces.Client, *interfaces.ErrorMessage) {
clients := make([]interfaces.Client, 0)
for i := 0; i < len(h.CliClients); i++ {
if h.CliClients[i].CanProvideModel(modelName) && h.CliClients[i].IsAvailable() && !h.CliClients[i].IsModelQuotaExceeded(modelName) {
clients = append(clients, h.CliClients[i])
}
}
// Lock the mutex to update the last used client index
h.Mutex.Lock()
if _, hasKey := h.LastUsedClientIndex[modelName]; !hasKey {
h.LastUsedClientIndex[modelName] = 0
}
if len(clients) == 0 {
h.Mutex.Unlock()
return nil, &interfaces.ErrorMessage{StatusCode: 500, Error: fmt.Errorf("no clients available")}
}
var cliClient interfaces.Client
startIndex := h.LastUsedClientIndex[modelName]
if (len(isGenerateContent) > 0 && isGenerateContent[0]) || len(isGenerateContent) == 0 {
currentIndex := (startIndex + 1) % len(clients)
h.LastUsedClientIndex[modelName] = currentIndex
}
h.Mutex.Unlock()
// Reorder the client to start from the last used index
reorderedClients := make([]interfaces.Client, 0)
for i := 0; i < len(clients); i++ {
cliClient = clients[(startIndex+1+i)%len(clients)]
reorderedClients = append(reorderedClients, cliClient)
}
if len(reorderedClients) == 0 {
if util.GetProviderName(modelName, h.Cfg) == "claude" {
// log.Debugf("Claude Model %s is quota exceeded for all accounts", modelName)
return nil, &interfaces.ErrorMessage{StatusCode: 429, Error: fmt.Errorf(`{"type":"error","error":{"type":"rate_limit_error","message":"This request would exceed your account's rate limit. Please try again later."}}`)}
}
return nil, &interfaces.ErrorMessage{StatusCode: 429, Error: fmt.Errorf(`{"error":{"code":429,"message":"All the models of '%s' are quota exceeded","status":"RESOURCE_EXHAUSTED"}}`, modelName)}
}
locked := false
for i := 0; i < len(reorderedClients); i++ {
cliClient = reorderedClients[i]
if mutex := cliClient.GetRequestMutex(); mutex != nil {
if mutex.TryLock() {
locked = true
break
}
} else {
locked = true
}
}
if !locked {
cliClient = clients[0]
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Lock()
}
}
return cliClient, nil
}
func (h *BaseAPIHandler) UpdateClients(cfg *config.Config) { h.Cfg = cfg }
// GetAlt extracts the 'alt' parameter from the request query string.
// It checks both 'alt' and '$alt' parameters and returns the appropriate value.
@@ -215,6 +131,109 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c *
}
}
// ExecuteWithAuthManager executes a non-streaming request via the core auth manager.
// This path is the only supported execution route.
func (h *BaseAPIHandler) ExecuteWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, *interfaces.ErrorMessage) {
providers := util.GetProviderName(modelName, h.Cfg)
if len(providers) == 0 {
return nil, &interfaces.ErrorMessage{StatusCode: http.StatusBadRequest, Error: fmt.Errorf("unknown provider for model %s", modelName)}
}
req := coreexecutor.Request{
Model: modelName,
Payload: cloneBytes(rawJSON),
}
opts := coreexecutor.Options{
Stream: false,
Alt: alt,
OriginalRequest: cloneBytes(rawJSON),
SourceFormat: sdktranslator.FromString(handlerType),
}
resp, err := h.AuthManager.Execute(ctx, providers, req, opts)
if err != nil {
if msg, ok := executor.UnwrapError(err); ok {
return nil, msg
}
return nil, &interfaces.ErrorMessage{StatusCode: http.StatusInternalServerError, Error: err}
}
return cloneBytes(resp.Payload), nil
}
// ExecuteStreamWithAuthManager executes a streaming request via the core auth manager.
// This path is the only supported execution route.
func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) (<-chan []byte, <-chan *interfaces.ErrorMessage) {
providers := util.GetProviderName(modelName, h.Cfg)
if len(providers) == 0 {
errChan := make(chan *interfaces.ErrorMessage, 1)
errChan <- &interfaces.ErrorMessage{StatusCode: http.StatusBadRequest, Error: fmt.Errorf("unknown provider for model %s", modelName)}
close(errChan)
return nil, errChan
}
req := coreexecutor.Request{
Model: modelName,
Payload: cloneBytes(rawJSON),
}
opts := coreexecutor.Options{
Stream: true,
Alt: alt,
OriginalRequest: cloneBytes(rawJSON),
SourceFormat: sdktranslator.FromString(handlerType),
}
chunks, err := h.AuthManager.ExecuteStream(ctx, providers, req, opts)
if err != nil {
errChan := make(chan *interfaces.ErrorMessage, 1)
if msg, ok := executor.UnwrapError(err); ok {
errChan <- msg
} else {
errChan <- &interfaces.ErrorMessage{StatusCode: http.StatusInternalServerError, Error: err}
}
close(errChan)
return nil, errChan
}
dataChan := make(chan []byte)
errChan := make(chan *interfaces.ErrorMessage, 1)
go func() {
defer close(dataChan)
defer close(errChan)
for chunk := range chunks {
if chunk.Err != nil {
if msg, ok := executor.UnwrapError(chunk.Err); ok {
errChan <- msg
} else {
errChan <- &interfaces.ErrorMessage{StatusCode: http.StatusInternalServerError, Error: chunk.Err}
}
return
}
if len(chunk.Payload) > 0 {
dataChan <- cloneBytes(chunk.Payload)
}
}
}()
return dataChan, errChan
}
func cloneBytes(src []byte) []byte {
if len(src) == 0 {
return nil
}
dst := make([]byte, len(src))
copy(dst, src)
return dst
}
// WriteErrorResponse writes an error message to the response writer using the HTTP status embedded in the message.
func (h *BaseAPIHandler) WriteErrorResponse(c *gin.Context, msg *interfaces.ErrorMessage) {
status := http.StatusInternalServerError
if msg != nil && msg.StatusCode > 0 {
status = msg.StatusCode
}
c.Status(status)
if msg != nil && msg.Error != nil {
_, _ = c.Writer.Write([]byte(msg.Error.Error()))
} else {
_, _ = c.Writer.Write([]byte(http.StatusText(status)))
}
}
func (h *BaseAPIHandler) LoggingAPIResponseError(ctx context.Context, err *interfaces.ErrorMessage) {
if h.Cfg.RequestLog {
if ginContext, ok := ctx.Value("gin").(*gin.Context); ok {