mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-02-03 04:50:52 +08:00
feat(watcher): implement incremental client hot-reloading
This commit is contained in:
@@ -292,7 +292,8 @@ func corsMiddleware() gin.HandlerFunc {
|
|||||||
// Parameters:
|
// Parameters:
|
||||||
// - clients: The new slice of AI service clients
|
// - clients: The new slice of AI service clients
|
||||||
// - cfg: The new application configuration
|
// - cfg: The new application configuration
|
||||||
func (s *Server) UpdateClients(clients []interfaces.Client, cfg *config.Config) {
|
func (s *Server) UpdateClients(clients map[string]interfaces.Client, cfg *config.Config) {
|
||||||
|
clientSlice := s.clientsToSlice(clients)
|
||||||
// Update request logger enabled state if it has changed
|
// Update request logger enabled state if it has changed
|
||||||
if s.requestLogger != nil && s.cfg.RequestLog != cfg.RequestLog {
|
if s.requestLogger != nil && s.cfg.RequestLog != cfg.RequestLog {
|
||||||
s.requestLogger.SetEnabled(cfg.RequestLog)
|
s.requestLogger.SetEnabled(cfg.RequestLog)
|
||||||
@@ -310,11 +311,11 @@ func (s *Server) UpdateClients(clients []interfaces.Client, cfg *config.Config)
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.cfg = cfg
|
s.cfg = cfg
|
||||||
s.handlers.UpdateClients(clients, cfg)
|
s.handlers.UpdateClients(clientSlice, cfg)
|
||||||
if s.mgmt != nil {
|
if s.mgmt != nil {
|
||||||
s.mgmt.SetConfig(cfg)
|
s.mgmt.SetConfig(cfg)
|
||||||
}
|
}
|
||||||
log.Infof("server clients and configuration updated: %d clients", len(clients))
|
log.Infof("server clients and configuration updated: %d clients", len(clientSlice))
|
||||||
}
|
}
|
||||||
|
|
||||||
// (management handlers moved to internal/api/handlers/management)
|
// (management handlers moved to internal/api/handlers/management)
|
||||||
@@ -384,3 +385,11 @@ func AuthMiddleware(cfg *config.Config) gin.HandlerFunc {
|
|||||||
c.Next()
|
c.Next()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) clientsToSlice(clientMap map[string]interfaces.Client) []interfaces.Client {
|
||||||
|
slice := make([]interfaces.Client, 0, len(clientMap))
|
||||||
|
for _, v := range clientMap {
|
||||||
|
slice = append(slice, v)
|
||||||
|
}
|
||||||
|
return slice
|
||||||
|
}
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ import (
|
|||||||
// - configPath: The path to the configuration file for watching changes
|
// - configPath: The path to the configuration file for watching changes
|
||||||
func StartService(cfg *config.Config, configPath string) {
|
func StartService(cfg *config.Config, configPath string) {
|
||||||
// Create a pool of API clients, one for each token file found.
|
// Create a pool of API clients, one for each token file found.
|
||||||
cliClients := make([]interfaces.Client, 0)
|
cliClients := make(map[string]interfaces.Client)
|
||||||
err := filepath.Walk(cfg.AuthDir, func(path string, info fs.FileInfo, err error) error {
|
err := filepath.Walk(cfg.AuthDir, func(path string, info fs.FileInfo, err error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -88,7 +88,7 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
|
|
||||||
// Add the new client to the pool.
|
// Add the new client to the pool.
|
||||||
cliClient := client.NewGeminiCLIClient(httpClient, &ts, cfg)
|
cliClient := client.NewGeminiCLIClient(httpClient, &ts, cfg)
|
||||||
cliClients = append(cliClients, cliClient)
|
cliClients[path] = cliClient
|
||||||
}
|
}
|
||||||
} else if tokenType == "codex" {
|
} else if tokenType == "codex" {
|
||||||
var ts codex.CodexTokenStorage
|
var ts codex.CodexTokenStorage
|
||||||
@@ -102,7 +102,7 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
return errGetClient
|
return errGetClient
|
||||||
}
|
}
|
||||||
log.Info("Authentication successful.")
|
log.Info("Authentication successful.")
|
||||||
cliClients = append(cliClients, codexClient)
|
cliClients[path] = codexClient
|
||||||
}
|
}
|
||||||
} else if tokenType == "claude" {
|
} else if tokenType == "claude" {
|
||||||
var ts claude.ClaudeTokenStorage
|
var ts claude.ClaudeTokenStorage
|
||||||
@@ -111,7 +111,7 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
log.Info("Initializing claude authentication for token...")
|
log.Info("Initializing claude authentication for token...")
|
||||||
claudeClient := client.NewClaudeClient(cfg, &ts)
|
claudeClient := client.NewClaudeClient(cfg, &ts)
|
||||||
log.Info("Authentication successful.")
|
log.Info("Authentication successful.")
|
||||||
cliClients = append(cliClients, claudeClient)
|
cliClients[path] = claudeClient
|
||||||
}
|
}
|
||||||
} else if tokenType == "qwen" {
|
} else if tokenType == "qwen" {
|
||||||
var ts qwen.QwenTokenStorage
|
var ts qwen.QwenTokenStorage
|
||||||
@@ -120,7 +120,7 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
log.Info("Initializing qwen authentication for token...")
|
log.Info("Initializing qwen authentication for token...")
|
||||||
qwenClient := client.NewQwenClient(cfg, &ts)
|
qwenClient := client.NewQwenClient(cfg, &ts)
|
||||||
log.Info("Authentication successful.")
|
log.Info("Authentication successful.")
|
||||||
cliClients = append(cliClients, qwenClient)
|
cliClients[path] = qwenClient
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -130,6 +130,8 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
log.Fatalf("Error walking auth directory: %v", err)
|
log.Fatalf("Error walking auth directory: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
clientSlice := clientsToSlice(cliClients)
|
||||||
|
|
||||||
if len(cfg.GlAPIKey) > 0 {
|
if len(cfg.GlAPIKey) > 0 {
|
||||||
// Initialize clients with Generative Language API Keys if provided in configuration.
|
// Initialize clients with Generative Language API Keys if provided in configuration.
|
||||||
for i := 0; i < len(cfg.GlAPIKey); i++ {
|
for i := 0; i < len(cfg.GlAPIKey); i++ {
|
||||||
@@ -137,7 +139,7 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
|
|
||||||
log.Debug("Initializing with Generative Language API Key...")
|
log.Debug("Initializing with Generative Language API Key...")
|
||||||
cliClient := client.NewGeminiClient(httpClient, cfg, cfg.GlAPIKey[i])
|
cliClient := client.NewGeminiClient(httpClient, cfg, cfg.GlAPIKey[i])
|
||||||
cliClients = append(cliClients, cliClient)
|
clientSlice = append(clientSlice, cliClient)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -146,7 +148,7 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
for i := 0; i < len(cfg.ClaudeKey); i++ {
|
for i := 0; i < len(cfg.ClaudeKey); i++ {
|
||||||
log.Debug("Initializing with Claude API Key...")
|
log.Debug("Initializing with Claude API Key...")
|
||||||
cliClient := client.NewClaudeClientWithKey(cfg, i)
|
cliClient := client.NewClaudeClientWithKey(cfg, i)
|
||||||
cliClients = append(cliClients, cliClient)
|
clientSlice = append(clientSlice, cliClient)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -155,7 +157,7 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
for i := 0; i < len(cfg.CodexKey); i++ {
|
for i := 0; i < len(cfg.CodexKey); i++ {
|
||||||
log.Debug("Initializing with Codex API Key...")
|
log.Debug("Initializing with Codex API Key...")
|
||||||
cliClient := client.NewCodexClientWithKey(cfg, i)
|
cliClient := client.NewCodexClientWithKey(cfg, i)
|
||||||
cliClients = append(cliClients, cliClient)
|
clientSlice = append(clientSlice, cliClient)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -167,12 +169,12 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
if errClient != nil {
|
if errClient != nil {
|
||||||
log.Fatalf("failed to create OpenAI compatibility client for %s: %v", compatConfig.Name, errClient)
|
log.Fatalf("failed to create OpenAI compatibility client for %s: %v", compatConfig.Name, errClient)
|
||||||
}
|
}
|
||||||
cliClients = append(cliClients, compatClient)
|
clientSlice = append(clientSlice, compatClient)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create and start the API server with the pool of clients in a separate goroutine.
|
// Create and start the API server with the pool of clients in a separate goroutine.
|
||||||
apiServer := api.NewServer(cfg, cliClients, configPath)
|
apiServer := api.NewServer(cfg, clientSlice, configPath)
|
||||||
log.Infof("Starting API server on port %d", cfg.Port)
|
log.Infof("Starting API server on port %d", cfg.Port)
|
||||||
|
|
||||||
// Start the API server in a goroutine so it doesn't block the main thread.
|
// Start the API server in a goroutine so it doesn't block the main thread.
|
||||||
@@ -187,7 +189,7 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
log.Info("API server started successfully")
|
log.Info("API server started successfully")
|
||||||
|
|
||||||
// Setup file watcher for config and auth directory changes to enable hot-reloading.
|
// Setup file watcher for config and auth directory changes to enable hot-reloading.
|
||||||
fileWatcher, errNewWatcher := watcher.NewWatcher(configPath, cfg.AuthDir, func(newClients []interfaces.Client, newCfg *config.Config) {
|
fileWatcher, errNewWatcher := watcher.NewWatcher(configPath, cfg.AuthDir, func(newClients map[string]interfaces.Client, newCfg *config.Config) {
|
||||||
// Update the API server with new clients and configuration when files change.
|
// Update the API server with new clients and configuration when files change.
|
||||||
apiServer.UpdateClients(newClients, newCfg)
|
apiServer.UpdateClients(newClients, newCfg)
|
||||||
})
|
})
|
||||||
@@ -230,8 +232,9 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
|
|
||||||
// Function to check and refresh tokens for all client types before they expire.
|
// Function to check and refresh tokens for all client types before they expire.
|
||||||
checkAndRefresh := func() {
|
checkAndRefresh := func() {
|
||||||
for i := 0; i < len(cliClients); i++ {
|
clientSlice := clientsToSlice(cliClients)
|
||||||
if codexCli, ok := cliClients[i].(*client.CodexClient); ok {
|
for i := 0; i < len(clientSlice); i++ {
|
||||||
|
if codexCli, ok := clientSlice[i].(*client.CodexClient); ok {
|
||||||
if ts, isCodexTS := codexCli.TokenStorage().(*claude.ClaudeTokenStorage); isCodexTS {
|
if ts, isCodexTS := codexCli.TokenStorage().(*claude.ClaudeTokenStorage); isCodexTS {
|
||||||
if ts != nil && ts.Expire != "" {
|
if ts != nil && ts.Expire != "" {
|
||||||
if expTime, errParse := time.Parse(time.RFC3339, ts.Expire); errParse == nil {
|
if expTime, errParse := time.Parse(time.RFC3339, ts.Expire); errParse == nil {
|
||||||
@@ -242,7 +245,7 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if claudeCli, isOK := cliClients[i].(*client.ClaudeClient); isOK {
|
} else if claudeCli, isOK := clientSlice[i].(*client.ClaudeClient); isOK {
|
||||||
if ts, isCluadeTS := claudeCli.TokenStorage().(*claude.ClaudeTokenStorage); isCluadeTS {
|
if ts, isCluadeTS := claudeCli.TokenStorage().(*claude.ClaudeTokenStorage); isCluadeTS {
|
||||||
if ts != nil && ts.Expire != "" {
|
if ts != nil && ts.Expire != "" {
|
||||||
if expTime, errParse := time.Parse(time.RFC3339, ts.Expire); errParse == nil {
|
if expTime, errParse := time.Parse(time.RFC3339, ts.Expire); errParse == nil {
|
||||||
@@ -253,7 +256,7 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if qwenCli, isQwenOK := cliClients[i].(*client.QwenClient); isQwenOK {
|
} else if qwenCli, isQwenOK := clientSlice[i].(*client.QwenClient); isQwenOK {
|
||||||
if ts, isQwenTS := qwenCli.TokenStorage().(*qwen.QwenTokenStorage); isQwenTS {
|
if ts, isQwenTS := qwenCli.TokenStorage().(*qwen.QwenTokenStorage); isQwenTS {
|
||||||
if ts != nil && ts.Expire != "" {
|
if ts != nil && ts.Expire != "" {
|
||||||
if expTime, errParse := time.Parse(time.RFC3339, ts.Expire); errParse == nil {
|
if expTime, errParse := time.Parse(time.RFC3339, ts.Expire); errParse == nil {
|
||||||
@@ -306,3 +309,11 @@ func StartService(cfg *config.Config, configPath string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func clientsToSlice(clientMap map[string]interfaces.Client) []interfaces.Client {
|
||||||
|
s := make([]interfaces.Client, 0, len(clientMap))
|
||||||
|
for _, v := range clientMap {
|
||||||
|
s = append(s, v)
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|||||||
@@ -34,14 +34,14 @@ type Watcher struct {
|
|||||||
configPath string
|
configPath string
|
||||||
authDir string
|
authDir string
|
||||||
config *config.Config
|
config *config.Config
|
||||||
clients []interfaces.Client
|
clients map[string]interfaces.Client
|
||||||
clientsMutex sync.RWMutex
|
clientsMutex sync.RWMutex
|
||||||
reloadCallback func([]interfaces.Client, *config.Config)
|
reloadCallback func(map[string]interfaces.Client, *config.Config)
|
||||||
watcher *fsnotify.Watcher
|
watcher *fsnotify.Watcher
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWatcher creates a new file watcher instance
|
// NewWatcher creates a new file watcher instance
|
||||||
func NewWatcher(configPath, authDir string, reloadCallback func([]interfaces.Client, *config.Config)) (*Watcher, error) {
|
func NewWatcher(configPath, authDir string, reloadCallback func(map[string]interfaces.Client, *config.Config)) (*Watcher, error) {
|
||||||
watcher, errNewWatcher := fsnotify.NewWatcher()
|
watcher, errNewWatcher := fsnotify.NewWatcher()
|
||||||
if errNewWatcher != nil {
|
if errNewWatcher != nil {
|
||||||
return nil, errNewWatcher
|
return nil, errNewWatcher
|
||||||
@@ -52,6 +52,7 @@ func NewWatcher(configPath, authDir string, reloadCallback func([]interfaces.Cli
|
|||||||
authDir: authDir,
|
authDir: authDir,
|
||||||
reloadCallback: reloadCallback,
|
reloadCallback: reloadCallback,
|
||||||
watcher: watcher,
|
watcher: watcher,
|
||||||
|
clients: make(map[string]interfaces.Client),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,7 +91,7 @@ func (w *Watcher) SetConfig(cfg *config.Config) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetClients updates the current client list
|
// SetClients updates the current client list
|
||||||
func (w *Watcher) SetClients(clients []interfaces.Client) {
|
func (w *Watcher) SetClients(clients map[string]interfaces.Client) {
|
||||||
w.clientsMutex.Lock()
|
w.clientsMutex.Lock()
|
||||||
defer w.clientsMutex.Unlock()
|
defer w.clientsMutex.Unlock()
|
||||||
w.clients = clients
|
w.clients = clients
|
||||||
@@ -119,7 +120,6 @@ func (w *Watcher) processEvents(ctx context.Context) {
|
|||||||
// handleEvent processes individual file system events
|
// handleEvent processes individual file system events
|
||||||
func (w *Watcher) handleEvent(event fsnotify.Event) {
|
func (w *Watcher) handleEvent(event fsnotify.Event) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
log.Debugf("file system event detected: %s %s", event.Op.String(), event.Name)
|
log.Debugf("file system event detected: %s %s", event.Op.String(), event.Name)
|
||||||
|
|
||||||
// Handle config file changes
|
// Handle config file changes
|
||||||
@@ -130,13 +130,14 @@ func (w *Watcher) handleEvent(event fsnotify.Event) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle auth directory changes (only for .json files)
|
// Handle auth directory changes incrementally
|
||||||
// Simplified: reload on any change to .json files in auth directory
|
|
||||||
if strings.HasPrefix(event.Name, w.authDir) && strings.HasSuffix(event.Name, ".json") {
|
if strings.HasPrefix(event.Name, w.authDir) && strings.HasSuffix(event.Name, ".json") {
|
||||||
log.Infof("auth file changed (%s): %s, reloading clients", event.Op.String(), filepath.Base(event.Name))
|
log.Infof("auth file changed (%s): %s, processing incrementally", event.Op.String(), filepath.Base(event.Name))
|
||||||
log.Debugf("auth file change details - operation: %s, file: %s, timestamp: %s",
|
if event.Op&fsnotify.Create == fsnotify.Create || event.Op&fsnotify.Write == fsnotify.Write {
|
||||||
event.Op.String(), filepath.Base(event.Name), now.Format("2006-01-02 15:04:05.000"))
|
w.addOrUpdateClient(event.Name)
|
||||||
w.reloadClients()
|
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
|
||||||
|
w.removeClient(event.Name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -201,9 +202,10 @@ func (w *Watcher) reloadConfig() {
|
|||||||
w.reloadClients()
|
w.reloadClients()
|
||||||
}
|
}
|
||||||
|
|
||||||
// reloadClients reloads all authentication clients
|
// reloadClients performs a full scan of the auth directory and reloads all clients.
|
||||||
|
// This is used for initial startup and for handling config file reloads.
|
||||||
func (w *Watcher) reloadClients() {
|
func (w *Watcher) reloadClients() {
|
||||||
log.Debugf("starting client reload process")
|
log.Debugf("starting full client reload process")
|
||||||
|
|
||||||
w.clientsMutex.RLock()
|
w.clientsMutex.RLock()
|
||||||
cfg := w.config
|
cfg := w.config
|
||||||
@@ -215,25 +217,24 @@ func (w *Watcher) reloadClients() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("scanning auth directory: %s", cfg.AuthDir)
|
log.Debugf("scanning auth directory for initial load or full reload: %s", cfg.AuthDir)
|
||||||
|
|
||||||
// Create new client list
|
// Create new client map
|
||||||
newClients := make([]interfaces.Client, 0)
|
newClients := make(map[string]interfaces.Client)
|
||||||
authFileCount := 0
|
authFileCount := 0
|
||||||
successfulAuthCount := 0
|
successfulAuthCount := 0
|
||||||
|
|
||||||
|
// Handle tilde expansion for auth directory
|
||||||
if strings.HasPrefix(cfg.AuthDir, "~") {
|
if strings.HasPrefix(cfg.AuthDir, "~") {
|
||||||
home, errUserHomeDir := os.UserHomeDir()
|
home, errUserHomeDir := os.UserHomeDir()
|
||||||
if errUserHomeDir != nil {
|
if errUserHomeDir != nil {
|
||||||
log.Fatalf("failed to get home directory: %v", errUserHomeDir)
|
log.Fatalf("failed to get home directory: %v", errUserHomeDir)
|
||||||
}
|
}
|
||||||
// Reconstruct the path by replacing the tilde with the user's home directory.
|
|
||||||
parts := strings.Split(cfg.AuthDir, string(os.PathSeparator))
|
parts := strings.Split(cfg.AuthDir, string(os.PathSeparator))
|
||||||
if len(parts) > 1 {
|
if len(parts) > 1 {
|
||||||
parts[0] = home
|
parts[0] = home
|
||||||
cfg.AuthDir = path.Join(parts...)
|
cfg.AuthDir = path.Join(parts...)
|
||||||
} else {
|
} else {
|
||||||
// If the path is just "~", set it to the home directory.
|
|
||||||
cfg.AuthDir = home
|
cfg.AuthDir = home
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -244,91 +245,14 @@ func (w *Watcher) reloadClients() {
|
|||||||
log.Debugf("error accessing path %s: %v", path, err)
|
log.Debugf("error accessing path %s: %v", path, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process only JSON files in the auth directory
|
|
||||||
if !info.IsDir() && strings.HasSuffix(info.Name(), ".json") {
|
if !info.IsDir() && strings.HasSuffix(info.Name(), ".json") {
|
||||||
authFileCount++
|
authFileCount++
|
||||||
log.Debugf("processing auth file %d: %s", authFileCount, filepath.Base(path))
|
log.Debugf("processing auth file %d: %s", authFileCount, filepath.Base(path))
|
||||||
|
if client, err := w.createClientFromFile(path, cfg); err == nil {
|
||||||
data, errReadFile := os.ReadFile(path)
|
newClients[path] = client
|
||||||
if errReadFile != nil {
|
successfulAuthCount++
|
||||||
return errReadFile
|
} else {
|
||||||
}
|
log.Errorf("failed to create client from file %s: %v", path, err)
|
||||||
|
|
||||||
tokenType := "gemini"
|
|
||||||
typeResult := gjson.GetBytes(data, "type")
|
|
||||||
if typeResult.Exists() {
|
|
||||||
tokenType = typeResult.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decode the token storage file
|
|
||||||
if tokenType == "gemini" {
|
|
||||||
var ts gemini.GeminiTokenStorage
|
|
||||||
if err = json.Unmarshal(data, &ts); err == nil {
|
|
||||||
// For each valid token, create an authenticated client
|
|
||||||
clientCtx := context.Background()
|
|
||||||
log.Debugf(" initializing gemini authentication for token from %s...", filepath.Base(path))
|
|
||||||
geminiAuth := gemini.NewGeminiAuth()
|
|
||||||
httpClient, errGetClient := geminiAuth.GetAuthenticatedClient(clientCtx, &ts, cfg)
|
|
||||||
if errGetClient != nil {
|
|
||||||
log.Errorf(" failed to get authenticated client for token %s: %v", path, errGetClient)
|
|
||||||
return nil // Continue processing other files
|
|
||||||
}
|
|
||||||
log.Debugf(" authentication successful for token from %s", filepath.Base(path))
|
|
||||||
|
|
||||||
// Add the new client to the pool
|
|
||||||
cliClient := client.NewGeminiCLIClient(httpClient, &ts, cfg)
|
|
||||||
newClients = append(newClients, cliClient)
|
|
||||||
successfulAuthCount++
|
|
||||||
} else {
|
|
||||||
log.Errorf(" failed to decode token file %s: %v", path, err)
|
|
||||||
}
|
|
||||||
} else if tokenType == "codex" {
|
|
||||||
var ts codex.CodexTokenStorage
|
|
||||||
if err = json.Unmarshal(data, &ts); err == nil {
|
|
||||||
// For each valid token, create an authenticated client
|
|
||||||
log.Debugf(" initializing codex authentication for token from %s...", filepath.Base(path))
|
|
||||||
codexClient, errGetClient := client.NewCodexClient(cfg, &ts)
|
|
||||||
if errGetClient != nil {
|
|
||||||
log.Errorf(" failed to get authenticated client for token %s: %v", path, errGetClient)
|
|
||||||
return nil // Continue processing other files
|
|
||||||
}
|
|
||||||
log.Debugf(" authentication successful for token from %s", filepath.Base(path))
|
|
||||||
|
|
||||||
// Add the new client to the pool
|
|
||||||
newClients = append(newClients, codexClient)
|
|
||||||
successfulAuthCount++
|
|
||||||
} else {
|
|
||||||
log.Errorf(" failed to decode token file %s: %v", path, err)
|
|
||||||
}
|
|
||||||
} else if tokenType == "claude" {
|
|
||||||
var ts claude.ClaudeTokenStorage
|
|
||||||
if err = json.Unmarshal(data, &ts); err == nil {
|
|
||||||
// For each valid token, create an authenticated client
|
|
||||||
log.Debugf(" initializing claude authentication for token from %s...", filepath.Base(path))
|
|
||||||
claudeClient := client.NewClaudeClient(cfg, &ts)
|
|
||||||
log.Debugf(" authentication successful for token from %s", filepath.Base(path))
|
|
||||||
|
|
||||||
// Add the new client to the pool
|
|
||||||
newClients = append(newClients, claudeClient)
|
|
||||||
successfulAuthCount++
|
|
||||||
} else {
|
|
||||||
log.Errorf(" failed to decode token file %s: %v", path, err)
|
|
||||||
}
|
|
||||||
} else if tokenType == "qwen" {
|
|
||||||
var ts qwen.QwenTokenStorage
|
|
||||||
if err = json.Unmarshal(data, &ts); err == nil {
|
|
||||||
// For each valid token, create an authenticated client
|
|
||||||
log.Debugf(" initializing qwen authentication for token from %s...", filepath.Base(path))
|
|
||||||
qwenClient := client.NewQwenClient(cfg, &ts)
|
|
||||||
log.Debugf(" authentication successful for token from %s", filepath.Base(path))
|
|
||||||
|
|
||||||
// Add the new client to the pool
|
|
||||||
newClients = append(newClients, qwenClient)
|
|
||||||
successfulAuthCount++
|
|
||||||
} else {
|
|
||||||
log.Errorf(" failed to decode token file %s: %v", path, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -337,31 +261,33 @@ func (w *Watcher) reloadClients() {
|
|||||||
log.Errorf("error walking auth directory: %v", errWalk)
|
log.Errorf("error walking auth directory: %v", errWalk)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("auth directory scan complete - found %d .json files, %d successful authentications", authFileCount, successfulAuthCount)
|
log.Debugf("auth directory scan complete - found %d .json files, %d successful authentications", authFileCount, successfulAuthCount)
|
||||||
|
|
||||||
|
// Note: API key-based clients are not stored in the map as they don't correspond to a file.
|
||||||
|
// They are re-created each time, which is lightweight.
|
||||||
|
clientSlice := w.clientsToSlice(newClients)
|
||||||
|
|
||||||
// Add clients for Generative Language API keys if configured
|
// Add clients for Generative Language API keys if configured
|
||||||
glAPIKeyCount := 0
|
glAPIKeyCount := 0
|
||||||
if len(cfg.GlAPIKey) > 0 {
|
if len(cfg.GlAPIKey) > 0 {
|
||||||
log.Debugf("processing %d Generative Language API Keys", len(cfg.GlAPIKey))
|
log.Debugf("processing %d Generative Language API Keys", len(cfg.GlAPIKey))
|
||||||
for i := 0; i < len(cfg.GlAPIKey); i++ {
|
for i := 0; i < len(cfg.GlAPIKey); i++ {
|
||||||
httpClient := util.SetProxy(cfg, &http.Client{})
|
httpClient := util.SetProxy(cfg, &http.Client{})
|
||||||
|
|
||||||
log.Debugf("Initializing with Generative Language API Key %d...", i+1)
|
log.Debugf("Initializing with Generative Language API Key %d...", i+1)
|
||||||
cliClient := client.NewGeminiClient(httpClient, cfg, cfg.GlAPIKey[i])
|
cliClient := client.NewGeminiClient(httpClient, cfg, cfg.GlAPIKey[i])
|
||||||
newClients = append(newClients, cliClient)
|
clientSlice = append(clientSlice, cliClient)
|
||||||
glAPIKeyCount++
|
glAPIKeyCount++
|
||||||
}
|
}
|
||||||
log.Debugf("Successfully initialized %d Generative Language API Key clients", glAPIKeyCount)
|
log.Debugf("Successfully initialized %d Generative Language API Key clients", glAPIKeyCount)
|
||||||
}
|
}
|
||||||
|
// ... (Claude, Codex, OpenAI-compat clients are handled similarly) ...
|
||||||
claudeAPIKeyCount := 0
|
claudeAPIKeyCount := 0
|
||||||
if len(cfg.ClaudeKey) > 0 {
|
if len(cfg.ClaudeKey) > 0 {
|
||||||
log.Debugf("processing %d Claude API Keys", len(cfg.ClaudeKey))
|
log.Debugf("processing %d Claude API Keys", len(cfg.ClaudeKey))
|
||||||
for i := 0; i < len(cfg.ClaudeKey); i++ {
|
for i := 0; i < len(cfg.ClaudeKey); i++ {
|
||||||
log.Debugf("Initializing with Claude API Key %d...", i+1)
|
log.Debugf("Initializing with Claude API Key %d...", i+1)
|
||||||
cliClient := client.NewClaudeClientWithKey(cfg, i)
|
cliClient := client.NewClaudeClientWithKey(cfg, i)
|
||||||
newClients = append(newClients, cliClient)
|
clientSlice = append(clientSlice, cliClient)
|
||||||
claudeAPIKeyCount++
|
claudeAPIKeyCount++
|
||||||
}
|
}
|
||||||
log.Debugf("Successfully initialized %d Claude API Key clients", claudeAPIKeyCount)
|
log.Debugf("Successfully initialized %d Claude API Key clients", claudeAPIKeyCount)
|
||||||
@@ -373,13 +299,12 @@ func (w *Watcher) reloadClients() {
|
|||||||
for i := 0; i < len(cfg.CodexKey); i++ {
|
for i := 0; i < len(cfg.CodexKey); i++ {
|
||||||
log.Debugf("Initializing with Codex API Key %d...", i+1)
|
log.Debugf("Initializing with Codex API Key %d...", i+1)
|
||||||
cliClient := client.NewCodexClientWithKey(cfg, i)
|
cliClient := client.NewCodexClientWithKey(cfg, i)
|
||||||
newClients = append(newClients, cliClient)
|
clientSlice = append(clientSlice, cliClient)
|
||||||
codexAPIKeyCount++
|
codexAPIKeyCount++
|
||||||
}
|
}
|
||||||
log.Debugf("Successfully initialized %d Codex API Key clients", codexAPIKeyCount)
|
log.Debugf("Successfully initialized %d Codex API Key clients", codexAPIKeyCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add clients for OpenAI compatibility providers if configured
|
|
||||||
openAICompatCount := 0
|
openAICompatCount := 0
|
||||||
if len(cfg.OpenAICompatibility) > 0 {
|
if len(cfg.OpenAICompatibility) > 0 {
|
||||||
log.Debugf("processing %d OpenAI-compatibility providers", len(cfg.OpenAICompatibility))
|
log.Debugf("processing %d OpenAI-compatibility providers", len(cfg.OpenAICompatibility))
|
||||||
@@ -390,38 +315,163 @@ func (w *Watcher) reloadClients() {
|
|||||||
log.Errorf(" failed to create OpenAI-compatibility client for %s: %v", compat.Name, errClient)
|
log.Errorf(" failed to create OpenAI-compatibility client for %s: %v", compat.Name, errClient)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
newClients = append(newClients, compatClient)
|
clientSlice = append(clientSlice, compatClient)
|
||||||
openAICompatCount++
|
openAICompatCount++
|
||||||
}
|
}
|
||||||
log.Debugf("Successfully initialized %d OpenAI-compatibility clients", openAICompatCount)
|
log.Debugf("Successfully initialized %d OpenAI-compatibility clients", openAICompatCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unregister old clients from the model registry if supported
|
// Unregister all old clients
|
||||||
w.clientsMutex.RLock()
|
w.clientsMutex.RLock()
|
||||||
for i := 0; i < len(w.clients); i++ {
|
for _, oldClient := range w.clients {
|
||||||
if u, ok := any(w.clients[i]).(interface{ UnregisterClient() }); ok {
|
if u, ok := any(oldClient).(interface{ UnregisterClient() }); ok {
|
||||||
u.UnregisterClient()
|
u.UnregisterClient()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
w.clientsMutex.RUnlock()
|
w.clientsMutex.RUnlock()
|
||||||
|
|
||||||
// Update the client list
|
// Update the client map
|
||||||
w.clientsMutex.Lock()
|
w.clientsMutex.Lock()
|
||||||
w.clients = newClients
|
w.clients = newClients
|
||||||
w.clientsMutex.Unlock()
|
w.clientsMutex.Unlock()
|
||||||
|
|
||||||
log.Infof("client reload complete - old: %d clients, new: %d clients (%d auth files + %d GL API keys + %d Claude API keys + %d OpenAI-compat)",
|
log.Infof("full client reload complete - old: %d clients, new: %d clients (%d auth files + %d GL API keys + %d Claude API keys + %d Codex keys + %d OpenAI-compat)",
|
||||||
oldClientCount,
|
oldClientCount,
|
||||||
len(newClients),
|
len(clientSlice),
|
||||||
successfulAuthCount,
|
successfulAuthCount,
|
||||||
glAPIKeyCount,
|
glAPIKeyCount,
|
||||||
claudeAPIKeyCount,
|
claudeAPIKeyCount,
|
||||||
|
codexAPIKeyCount,
|
||||||
openAICompatCount,
|
openAICompatCount,
|
||||||
)
|
)
|
||||||
|
|
||||||
// Trigger the callback to update the server
|
// Trigger the callback to update the server
|
||||||
if w.reloadCallback != nil {
|
if w.reloadCallback != nil {
|
||||||
log.Debugf("triggering server update callback")
|
log.Debugf("triggering server update callback")
|
||||||
w.reloadCallback(newClients, cfg)
|
// Note: The callback signature expects a map now, but the API server internally works with a slice.
|
||||||
|
// We pass the map directly, and the server will handle converting it.
|
||||||
|
w.reloadCallback(w.clients, cfg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// createClientFromFile creates a single client instance from a given token file path.
|
||||||
|
func (w *Watcher) createClientFromFile(path string, cfg *config.Config) (interfaces.Client, error) {
|
||||||
|
data, errReadFile := os.ReadFile(path)
|
||||||
|
if errReadFile != nil {
|
||||||
|
return nil, errReadFile
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the file is empty, it's likely an intermediate state (e.g., after touch, before write).
|
||||||
|
// Silently ignore it and wait for a subsequent write event with content.
|
||||||
|
if len(data) == 0 {
|
||||||
|
return nil, nil // Not an error, just nothing to process yet.
|
||||||
|
}
|
||||||
|
|
||||||
|
tokenType := "gemini"
|
||||||
|
typeResult := gjson.GetBytes(data, "type")
|
||||||
|
if typeResult.Exists() {
|
||||||
|
tokenType = typeResult.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if tokenType == "gemini" {
|
||||||
|
var ts gemini.GeminiTokenStorage
|
||||||
|
if err = json.Unmarshal(data, &ts); err == nil {
|
||||||
|
clientCtx := context.Background()
|
||||||
|
geminiAuth := gemini.NewGeminiAuth()
|
||||||
|
httpClient, errGetClient := geminiAuth.GetAuthenticatedClient(clientCtx, &ts, cfg)
|
||||||
|
if errGetClient != nil {
|
||||||
|
return nil, errGetClient
|
||||||
|
}
|
||||||
|
return client.NewGeminiCLIClient(httpClient, &ts, cfg), nil
|
||||||
|
}
|
||||||
|
} else if tokenType == "codex" {
|
||||||
|
var ts codex.CodexTokenStorage
|
||||||
|
if err = json.Unmarshal(data, &ts); err == nil {
|
||||||
|
return client.NewCodexClient(cfg, &ts)
|
||||||
|
}
|
||||||
|
} else if tokenType == "claude" {
|
||||||
|
var ts claude.ClaudeTokenStorage
|
||||||
|
if err = json.Unmarshal(data, &ts); err == nil {
|
||||||
|
return client.NewClaudeClient(cfg, &ts), nil
|
||||||
|
}
|
||||||
|
} else if tokenType == "qwen" {
|
||||||
|
var ts qwen.QwenTokenStorage
|
||||||
|
if err = json.Unmarshal(data, &ts); err == nil {
|
||||||
|
return client.NewQwenClient(cfg, &ts), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// clientsToSlice converts the client map to a slice.
|
||||||
|
func (w *Watcher) clientsToSlice(clientMap map[string]interfaces.Client) []interfaces.Client {
|
||||||
|
s := make([]interfaces.Client, 0, len(clientMap))
|
||||||
|
for _, v := range clientMap {
|
||||||
|
s = append(s, v)
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// addOrUpdateClient handles the addition or update of a single client.
|
||||||
|
func (w *Watcher) addOrUpdateClient(path string) {
|
||||||
|
w.clientsMutex.Lock()
|
||||||
|
defer w.clientsMutex.Unlock()
|
||||||
|
|
||||||
|
cfg := w.config
|
||||||
|
if cfg == nil {
|
||||||
|
log.Error("config is nil, cannot add or update client")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unregister old client if it exists
|
||||||
|
if oldClient, ok := w.clients[path]; ok {
|
||||||
|
if u, canUnregister := any(oldClient).(interface{ UnregisterClient() }); canUnregister {
|
||||||
|
log.Debugf("unregistering old client for updated file: %s", filepath.Base(path))
|
||||||
|
u.UnregisterClient()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newClient, err := w.createClientFromFile(path, cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to create/update client for %s: %v", filepath.Base(path), err)
|
||||||
|
// If creation fails, ensure the old client is removed from the map
|
||||||
|
delete(w.clients, path)
|
||||||
|
} else if newClient != nil { // Only update if a client was actually created
|
||||||
|
log.Debugf("successfully created/updated client for %s", filepath.Base(path))
|
||||||
|
w.clients[path] = newClient
|
||||||
|
} else {
|
||||||
|
// This case handles the empty file scenario gracefully
|
||||||
|
log.Debugf("ignoring empty auth file: %s", filepath.Base(path))
|
||||||
|
return // Do not trigger callback for an empty file
|
||||||
|
}
|
||||||
|
|
||||||
|
if w.reloadCallback != nil {
|
||||||
|
log.Debugf("triggering server update callback after add/update")
|
||||||
|
w.reloadCallback(w.clients, cfg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeClient handles the removal of a single client.
|
||||||
|
func (w *Watcher) removeClient(path string) {
|
||||||
|
w.clientsMutex.Lock()
|
||||||
|
defer w.clientsMutex.Unlock()
|
||||||
|
|
||||||
|
cfg := w.config
|
||||||
|
|
||||||
|
// Unregister client if it exists
|
||||||
|
if oldClient, ok := w.clients[path]; ok {
|
||||||
|
if u, canUnregister := any(oldClient).(interface{ UnregisterClient() }); canUnregister {
|
||||||
|
log.Debugf("unregistering client for removed file: %s", filepath.Base(path))
|
||||||
|
u.UnregisterClient()
|
||||||
|
}
|
||||||
|
delete(w.clients, path)
|
||||||
|
log.Debugf("removed client for %s", filepath.Base(path))
|
||||||
|
|
||||||
|
if w.reloadCallback != nil {
|
||||||
|
log.Debugf("triggering server update callback after removal")
|
||||||
|
w.reloadCallback(w.clients, cfg)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user