diff --git a/internal/watcher/clients.go b/internal/watcher/clients.go new file mode 100644 index 00000000..5cd8b6e6 --- /dev/null +++ b/internal/watcher/clients.go @@ -0,0 +1,270 @@ +// clients.go implements watcher client lifecycle logic and persistence helpers. +// It reloads clients, handles incremental auth file changes, and persists updates when supported. +package watcher + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + "github.com/router-for-me/CLIProxyAPI/v6/internal/config" + "github.com/router-for-me/CLIProxyAPI/v6/internal/util" + coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + log "github.com/sirupsen/logrus" +) + +func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string, forceAuthRefresh bool) { + log.Debugf("starting full client load process") + + w.clientsMutex.RLock() + cfg := w.config + w.clientsMutex.RUnlock() + + if cfg == nil { + log.Error("config is nil, cannot reload clients") + return + } + + if len(affectedOAuthProviders) > 0 { + w.clientsMutex.Lock() + if w.currentAuths != nil { + filtered := make(map[string]*coreauth.Auth, len(w.currentAuths)) + for id, auth := range w.currentAuths { + if auth == nil { + continue + } + provider := strings.ToLower(strings.TrimSpace(auth.Provider)) + if _, match := matchProvider(provider, affectedOAuthProviders); match { + continue + } + filtered[id] = auth + } + w.currentAuths = filtered + log.Debugf("applying oauth-excluded-models to providers %v", affectedOAuthProviders) + } else { + w.currentAuths = nil + } + w.clientsMutex.Unlock() + } + + geminiAPIKeyCount, vertexCompatAPIKeyCount, claudeAPIKeyCount, codexAPIKeyCount, openAICompatCount := BuildAPIKeyClients(cfg) + totalAPIKeyClients := geminiAPIKeyCount + vertexCompatAPIKeyCount + claudeAPIKeyCount + codexAPIKeyCount + openAICompatCount + log.Debugf("loaded %d API key clients", totalAPIKeyClients) + + var authFileCount int + if rescanAuth { + authFileCount = w.loadFileClients(cfg) + log.Debugf("loaded %d file-based clients", authFileCount) + } else { + w.clientsMutex.RLock() + authFileCount = len(w.lastAuthHashes) + w.clientsMutex.RUnlock() + log.Debugf("skipping auth directory rescan; retaining %d existing auth files", authFileCount) + } + + if rescanAuth { + w.clientsMutex.Lock() + + w.lastAuthHashes = make(map[string]string) + if resolvedAuthDir, errResolveAuthDir := util.ResolveAuthDir(cfg.AuthDir); errResolveAuthDir != nil { + log.Errorf("failed to resolve auth directory for hash cache: %v", errResolveAuthDir) + } else if resolvedAuthDir != "" { + _ = filepath.Walk(resolvedAuthDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return nil + } + if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".json") { + if data, errReadFile := os.ReadFile(path); errReadFile == nil && len(data) > 0 { + sum := sha256.Sum256(data) + normalizedPath := w.normalizeAuthPath(path) + w.lastAuthHashes[normalizedPath] = hex.EncodeToString(sum[:]) + } + } + return nil + }) + } + w.clientsMutex.Unlock() + } + + totalNewClients := authFileCount + geminiAPIKeyCount + vertexCompatAPIKeyCount + claudeAPIKeyCount + codexAPIKeyCount + openAICompatCount + + if w.reloadCallback != nil { + log.Debugf("triggering server update callback before auth refresh") + w.reloadCallback(cfg) + } + + w.refreshAuthState(forceAuthRefresh) + + log.Infof("full client load complete - %d clients (%d auth files + %d Gemini API keys + %d Vertex API keys + %d Claude API keys + %d Codex keys + %d OpenAI-compat)", + totalNewClients, + authFileCount, + geminiAPIKeyCount, + vertexCompatAPIKeyCount, + claudeAPIKeyCount, + codexAPIKeyCount, + openAICompatCount, + ) +} + +func (w *Watcher) addOrUpdateClient(path string) { + data, errRead := os.ReadFile(path) + if errRead != nil { + log.Errorf("failed to read auth file %s: %v", filepath.Base(path), errRead) + return + } + if len(data) == 0 { + log.Debugf("ignoring empty auth file: %s", filepath.Base(path)) + return + } + + sum := sha256.Sum256(data) + curHash := hex.EncodeToString(sum[:]) + normalized := w.normalizeAuthPath(path) + + w.clientsMutex.Lock() + + cfg := w.config + if cfg == nil { + log.Error("config is nil, cannot add or update client") + w.clientsMutex.Unlock() + return + } + if prev, ok := w.lastAuthHashes[normalized]; ok && prev == curHash { + log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(path)) + w.clientsMutex.Unlock() + return + } + + w.lastAuthHashes[normalized] = curHash + + w.clientsMutex.Unlock() // Unlock before the callback + + w.refreshAuthState(false) + + if w.reloadCallback != nil { + log.Debugf("triggering server update callback after add/update") + w.reloadCallback(cfg) + } + w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path) +} + +func (w *Watcher) removeClient(path string) { + normalized := w.normalizeAuthPath(path) + w.clientsMutex.Lock() + + cfg := w.config + delete(w.lastAuthHashes, normalized) + + w.clientsMutex.Unlock() // Release the lock before the callback + + w.refreshAuthState(false) + + if w.reloadCallback != nil { + log.Debugf("triggering server update callback after removal") + w.reloadCallback(cfg) + } + w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path) +} + +func (w *Watcher) loadFileClients(cfg *config.Config) int { + authFileCount := 0 + successfulAuthCount := 0 + + authDir, errResolveAuthDir := util.ResolveAuthDir(cfg.AuthDir) + if errResolveAuthDir != nil { + log.Errorf("failed to resolve auth directory: %v", errResolveAuthDir) + return 0 + } + if authDir == "" { + return 0 + } + + errWalk := filepath.Walk(authDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + log.Debugf("error accessing path %s: %v", path, err) + return err + } + if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".json") { + authFileCount++ + log.Debugf("processing auth file %d: %s", authFileCount, filepath.Base(path)) + if data, errCreate := os.ReadFile(path); errCreate == nil && len(data) > 0 { + successfulAuthCount++ + } + } + return nil + }) + + if errWalk != nil { + log.Errorf("error walking auth directory: %v", errWalk) + } + log.Debugf("auth directory scan complete - found %d .json files, %d readable", authFileCount, successfulAuthCount) + return authFileCount +} + +func BuildAPIKeyClients(cfg *config.Config) (int, int, int, int, int) { + geminiAPIKeyCount := 0 + vertexCompatAPIKeyCount := 0 + claudeAPIKeyCount := 0 + codexAPIKeyCount := 0 + openAICompatCount := 0 + + if len(cfg.GeminiKey) > 0 { + geminiAPIKeyCount += len(cfg.GeminiKey) + } + if len(cfg.VertexCompatAPIKey) > 0 { + vertexCompatAPIKeyCount += len(cfg.VertexCompatAPIKey) + } + if len(cfg.ClaudeKey) > 0 { + claudeAPIKeyCount += len(cfg.ClaudeKey) + } + if len(cfg.CodexKey) > 0 { + codexAPIKeyCount += len(cfg.CodexKey) + } + if len(cfg.OpenAICompatibility) > 0 { + for _, compatConfig := range cfg.OpenAICompatibility { + openAICompatCount += len(compatConfig.APIKeyEntries) + } + } + return geminiAPIKeyCount, vertexCompatAPIKeyCount, claudeAPIKeyCount, codexAPIKeyCount, openAICompatCount +} + +func (w *Watcher) persistConfigAsync() { + if w == nil || w.storePersister == nil { + return + } + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := w.storePersister.PersistConfig(ctx); err != nil { + log.Errorf("failed to persist config change: %v", err) + } + }() +} + +func (w *Watcher) persistAuthAsync(message string, paths ...string) { + if w == nil || w.storePersister == nil { + return + } + filtered := make([]string, 0, len(paths)) + for _, p := range paths { + if trimmed := strings.TrimSpace(p); trimmed != "" { + filtered = append(filtered, trimmed) + } + } + if len(filtered) == 0 { + return + } + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := w.storePersister.PersistAuthFiles(ctx, message, filtered...); err != nil { + log.Errorf("failed to persist auth changes: %v", err) + } + }() +} diff --git a/internal/watcher/config_reload.go b/internal/watcher/config_reload.go new file mode 100644 index 00000000..244f738e --- /dev/null +++ b/internal/watcher/config_reload.go @@ -0,0 +1,134 @@ +// config_reload.go implements debounced configuration hot reload. +// It detects material changes and reloads clients when the config changes. +package watcher + +import ( + "crypto/sha256" + "encoding/hex" + "os" + "time" + + "github.com/router-for-me/CLIProxyAPI/v6/internal/config" + "github.com/router-for-me/CLIProxyAPI/v6/internal/util" + "github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/diff" + "gopkg.in/yaml.v3" + + log "github.com/sirupsen/logrus" +) + +func (w *Watcher) stopConfigReloadTimer() { + w.configReloadMu.Lock() + if w.configReloadTimer != nil { + w.configReloadTimer.Stop() + w.configReloadTimer = nil + } + w.configReloadMu.Unlock() +} + +func (w *Watcher) scheduleConfigReload() { + w.configReloadMu.Lock() + defer w.configReloadMu.Unlock() + if w.configReloadTimer != nil { + w.configReloadTimer.Stop() + } + w.configReloadTimer = time.AfterFunc(configReloadDebounce, func() { + w.configReloadMu.Lock() + w.configReloadTimer = nil + w.configReloadMu.Unlock() + w.reloadConfigIfChanged() + }) +} + +func (w *Watcher) reloadConfigIfChanged() { + data, err := os.ReadFile(w.configPath) + if err != nil { + log.Errorf("failed to read config file for hash check: %v", err) + return + } + if len(data) == 0 { + log.Debugf("ignoring empty config file write event") + return + } + sum := sha256.Sum256(data) + newHash := hex.EncodeToString(sum[:]) + + w.clientsMutex.RLock() + currentHash := w.lastConfigHash + w.clientsMutex.RUnlock() + + if currentHash != "" && currentHash == newHash { + log.Debugf("config file content unchanged (hash match), skipping reload") + return + } + log.Infof("config file changed, reloading: %s", w.configPath) + if w.reloadConfig() { + finalHash := newHash + if updatedData, errRead := os.ReadFile(w.configPath); errRead == nil && len(updatedData) > 0 { + sumUpdated := sha256.Sum256(updatedData) + finalHash = hex.EncodeToString(sumUpdated[:]) + } else if errRead != nil { + log.WithError(errRead).Debug("failed to compute updated config hash after reload") + } + w.clientsMutex.Lock() + w.lastConfigHash = finalHash + w.clientsMutex.Unlock() + w.persistConfigAsync() + } +} + +func (w *Watcher) reloadConfig() bool { + log.Debug("=========================== CONFIG RELOAD ============================") + log.Debugf("starting config reload from: %s", w.configPath) + + newConfig, errLoadConfig := config.LoadConfig(w.configPath) + if errLoadConfig != nil { + log.Errorf("failed to reload config: %v", errLoadConfig) + return false + } + + if w.mirroredAuthDir != "" { + newConfig.AuthDir = w.mirroredAuthDir + } else { + if resolvedAuthDir, errResolveAuthDir := util.ResolveAuthDir(newConfig.AuthDir); errResolveAuthDir != nil { + log.Errorf("failed to resolve auth directory from config: %v", errResolveAuthDir) + } else { + newConfig.AuthDir = resolvedAuthDir + } + } + + w.clientsMutex.Lock() + var oldConfig *config.Config + _ = yaml.Unmarshal(w.oldConfigYaml, &oldConfig) + w.oldConfigYaml, _ = yaml.Marshal(newConfig) + w.config = newConfig + w.clientsMutex.Unlock() + + var affectedOAuthProviders []string + if oldConfig != nil { + _, affectedOAuthProviders = diff.DiffOAuthExcludedModelChanges(oldConfig.OAuthExcludedModels, newConfig.OAuthExcludedModels) + } + + util.SetLogLevel(newConfig) + if oldConfig != nil && oldConfig.Debug != newConfig.Debug { + log.Debugf("log level updated - debug mode changed from %t to %t", oldConfig.Debug, newConfig.Debug) + } + + if oldConfig != nil { + details := diff.BuildConfigChangeDetails(oldConfig, newConfig) + if len(details) > 0 { + log.Debugf("config changes detected:") + for _, d := range details { + log.Debugf(" %s", d) + } + } else { + log.Debugf("no material config field changes detected") + } + } + + authDirChanged := oldConfig == nil || oldConfig.AuthDir != newConfig.AuthDir + forceAuthRefresh := oldConfig != nil && oldConfig.ForceModelPrefix != newConfig.ForceModelPrefix + + log.Infof("config successfully reloaded, triggering client reload") + w.reloadClients(authDirChanged, affectedOAuthProviders, forceAuthRefresh) + return true +} diff --git a/internal/watcher/dispatcher.go b/internal/watcher/dispatcher.go new file mode 100644 index 00000000..ff3c5b63 --- /dev/null +++ b/internal/watcher/dispatcher.go @@ -0,0 +1,273 @@ +// dispatcher.go implements auth update dispatching and queue management. +// It batches, deduplicates, and delivers auth updates to registered consumers. +package watcher + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + "github.com/router-for-me/CLIProxyAPI/v6/internal/config" + "github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/synthesizer" + coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" +) + +func (w *Watcher) setAuthUpdateQueue(queue chan<- AuthUpdate) { + w.clientsMutex.Lock() + defer w.clientsMutex.Unlock() + w.authQueue = queue + if w.dispatchCond == nil { + w.dispatchCond = sync.NewCond(&w.dispatchMu) + } + if w.dispatchCancel != nil { + w.dispatchCancel() + if w.dispatchCond != nil { + w.dispatchMu.Lock() + w.dispatchCond.Broadcast() + w.dispatchMu.Unlock() + } + w.dispatchCancel = nil + } + if queue != nil { + ctx, cancel := context.WithCancel(context.Background()) + w.dispatchCancel = cancel + go w.dispatchLoop(ctx) + } +} + +func (w *Watcher) dispatchRuntimeAuthUpdate(update AuthUpdate) bool { + if w == nil { + return false + } + w.clientsMutex.Lock() + if w.runtimeAuths == nil { + w.runtimeAuths = make(map[string]*coreauth.Auth) + } + switch update.Action { + case AuthUpdateActionAdd, AuthUpdateActionModify: + if update.Auth != nil && update.Auth.ID != "" { + clone := update.Auth.Clone() + w.runtimeAuths[clone.ID] = clone + if w.currentAuths == nil { + w.currentAuths = make(map[string]*coreauth.Auth) + } + w.currentAuths[clone.ID] = clone.Clone() + } + case AuthUpdateActionDelete: + id := update.ID + if id == "" && update.Auth != nil { + id = update.Auth.ID + } + if id != "" { + delete(w.runtimeAuths, id) + if w.currentAuths != nil { + delete(w.currentAuths, id) + } + } + } + w.clientsMutex.Unlock() + if w.getAuthQueue() == nil { + return false + } + w.dispatchAuthUpdates([]AuthUpdate{update}) + return true +} + +func (w *Watcher) refreshAuthState(force bool) { + auths := w.SnapshotCoreAuths() + w.clientsMutex.Lock() + if len(w.runtimeAuths) > 0 { + for _, a := range w.runtimeAuths { + if a != nil { + auths = append(auths, a.Clone()) + } + } + } + updates := w.prepareAuthUpdatesLocked(auths, force) + w.clientsMutex.Unlock() + w.dispatchAuthUpdates(updates) +} + +func (w *Watcher) prepareAuthUpdatesLocked(auths []*coreauth.Auth, force bool) []AuthUpdate { + newState := make(map[string]*coreauth.Auth, len(auths)) + for _, auth := range auths { + if auth == nil || auth.ID == "" { + continue + } + newState[auth.ID] = auth.Clone() + } + if w.currentAuths == nil { + w.currentAuths = newState + if w.authQueue == nil { + return nil + } + updates := make([]AuthUpdate, 0, len(newState)) + for id, auth := range newState { + updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: auth.Clone()}) + } + return updates + } + if w.authQueue == nil { + w.currentAuths = newState + return nil + } + updates := make([]AuthUpdate, 0, len(newState)+len(w.currentAuths)) + for id, auth := range newState { + if existing, ok := w.currentAuths[id]; !ok { + updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: auth.Clone()}) + } else if force || !authEqual(existing, auth) { + updates = append(updates, AuthUpdate{Action: AuthUpdateActionModify, ID: id, Auth: auth.Clone()}) + } + } + for id := range w.currentAuths { + if _, ok := newState[id]; !ok { + updates = append(updates, AuthUpdate{Action: AuthUpdateActionDelete, ID: id}) + } + } + w.currentAuths = newState + return updates +} + +func (w *Watcher) dispatchAuthUpdates(updates []AuthUpdate) { + if len(updates) == 0 { + return + } + queue := w.getAuthQueue() + if queue == nil { + return + } + baseTS := time.Now().UnixNano() + w.dispatchMu.Lock() + if w.pendingUpdates == nil { + w.pendingUpdates = make(map[string]AuthUpdate) + } + for idx, update := range updates { + key := w.authUpdateKey(update, baseTS+int64(idx)) + if _, exists := w.pendingUpdates[key]; !exists { + w.pendingOrder = append(w.pendingOrder, key) + } + w.pendingUpdates[key] = update + } + if w.dispatchCond != nil { + w.dispatchCond.Signal() + } + w.dispatchMu.Unlock() +} + +func (w *Watcher) authUpdateKey(update AuthUpdate, ts int64) string { + if update.ID != "" { + return update.ID + } + return fmt.Sprintf("%s:%d", update.Action, ts) +} + +func (w *Watcher) dispatchLoop(ctx context.Context) { + for { + batch, ok := w.nextPendingBatch(ctx) + if !ok { + return + } + queue := w.getAuthQueue() + if queue == nil { + if ctx.Err() != nil { + return + } + time.Sleep(10 * time.Millisecond) + continue + } + for _, update := range batch { + select { + case queue <- update: + case <-ctx.Done(): + return + } + } + } +} + +func (w *Watcher) nextPendingBatch(ctx context.Context) ([]AuthUpdate, bool) { + w.dispatchMu.Lock() + defer w.dispatchMu.Unlock() + for len(w.pendingOrder) == 0 { + if ctx.Err() != nil { + return nil, false + } + w.dispatchCond.Wait() + if ctx.Err() != nil { + return nil, false + } + } + batch := make([]AuthUpdate, 0, len(w.pendingOrder)) + for _, key := range w.pendingOrder { + batch = append(batch, w.pendingUpdates[key]) + delete(w.pendingUpdates, key) + } + w.pendingOrder = w.pendingOrder[:0] + return batch, true +} + +func (w *Watcher) getAuthQueue() chan<- AuthUpdate { + w.clientsMutex.RLock() + defer w.clientsMutex.RUnlock() + return w.authQueue +} + +func (w *Watcher) stopDispatch() { + if w.dispatchCancel != nil { + w.dispatchCancel() + w.dispatchCancel = nil + } + w.dispatchMu.Lock() + w.pendingOrder = nil + w.pendingUpdates = nil + if w.dispatchCond != nil { + w.dispatchCond.Broadcast() + } + w.dispatchMu.Unlock() + w.clientsMutex.Lock() + w.authQueue = nil + w.clientsMutex.Unlock() +} + +func authEqual(a, b *coreauth.Auth) bool { + return reflect.DeepEqual(normalizeAuth(a), normalizeAuth(b)) +} + +func normalizeAuth(a *coreauth.Auth) *coreauth.Auth { + if a == nil { + return nil + } + clone := a.Clone() + clone.CreatedAt = time.Time{} + clone.UpdatedAt = time.Time{} + clone.LastRefreshedAt = time.Time{} + clone.NextRefreshAfter = time.Time{} + clone.Runtime = nil + clone.Quota.NextRecoverAt = time.Time{} + return clone +} + +func snapshotCoreAuths(cfg *config.Config, authDir string) []*coreauth.Auth { + ctx := &synthesizer.SynthesisContext{ + Config: cfg, + AuthDir: authDir, + Now: time.Now(), + IDGenerator: synthesizer.NewStableIDGenerator(), + } + + var out []*coreauth.Auth + + configSynth := synthesizer.NewConfigSynthesizer() + if auths, err := configSynth.Synthesize(ctx); err == nil { + out = append(out, auths...) + } + + fileSynth := synthesizer.NewFileSynthesizer() + if auths, err := fileSynth.Synthesize(ctx); err == nil { + out = append(out, auths...) + } + + return out +} diff --git a/internal/watcher/events.go b/internal/watcher/events.go new file mode 100644 index 00000000..250cf75c --- /dev/null +++ b/internal/watcher/events.go @@ -0,0 +1,194 @@ +// events.go implements fsnotify event handling for config and auth file changes. +// It normalizes paths, debounces noisy events, and triggers reload/update logic. +package watcher + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "os" + "path/filepath" + "runtime" + "strings" + "time" + + "github.com/fsnotify/fsnotify" + log "github.com/sirupsen/logrus" +) + +func matchProvider(provider string, targets []string) (string, bool) { + p := strings.ToLower(strings.TrimSpace(provider)) + for _, t := range targets { + if strings.EqualFold(p, strings.TrimSpace(t)) { + return p, true + } + } + return p, false +} + +func (w *Watcher) start(ctx context.Context) error { + if errAddConfig := w.watcher.Add(w.configPath); errAddConfig != nil { + log.Errorf("failed to watch config file %s: %v", w.configPath, errAddConfig) + return errAddConfig + } + log.Debugf("watching config file: %s", w.configPath) + + if errAddAuthDir := w.watcher.Add(w.authDir); errAddAuthDir != nil { + log.Errorf("failed to watch auth directory %s: %v", w.authDir, errAddAuthDir) + return errAddAuthDir + } + log.Debugf("watching auth directory: %s", w.authDir) + + go w.processEvents(ctx) + + w.reloadClients(true, nil, false) + return nil +} + +func (w *Watcher) processEvents(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case event, ok := <-w.watcher.Events: + if !ok { + return + } + w.handleEvent(event) + case errWatch, ok := <-w.watcher.Errors: + if !ok { + return + } + log.Errorf("file watcher error: %v", errWatch) + } + } +} + +func (w *Watcher) handleEvent(event fsnotify.Event) { + // Filter only relevant events: config file or auth-dir JSON files. + configOps := fsnotify.Write | fsnotify.Create | fsnotify.Rename + normalizedName := w.normalizeAuthPath(event.Name) + normalizedConfigPath := w.normalizeAuthPath(w.configPath) + normalizedAuthDir := w.normalizeAuthPath(w.authDir) + isConfigEvent := normalizedName == normalizedConfigPath && event.Op&configOps != 0 + authOps := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename + isAuthJSON := strings.HasPrefix(normalizedName, normalizedAuthDir) && strings.HasSuffix(normalizedName, ".json") && event.Op&authOps != 0 + if !isConfigEvent && !isAuthJSON { + // Ignore unrelated files (e.g., cookie snapshots *.cookie) and other noise. + return + } + + now := time.Now() + log.Debugf("file system event detected: %s %s", event.Op.String(), event.Name) + + // Handle config file changes + if isConfigEvent { + log.Debugf("config file change details - operation: %s, timestamp: %s", event.Op.String(), now.Format("2006-01-02 15:04:05.000")) + w.scheduleConfigReload() + return + } + + // Handle auth directory changes incrementally (.json only) + if event.Op&(fsnotify.Remove|fsnotify.Rename) != 0 { + if w.shouldDebounceRemove(normalizedName, now) { + log.Debugf("debouncing remove event for %s", filepath.Base(event.Name)) + return + } + // Atomic replace on some platforms may surface as Rename (or Remove) before the new file is ready. + // Wait briefly; if the path exists again, treat as an update instead of removal. + time.Sleep(replaceCheckDelay) + if _, statErr := os.Stat(event.Name); statErr == nil { + if unchanged, errSame := w.authFileUnchanged(event.Name); errSame == nil && unchanged { + log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(event.Name)) + return + } + log.Infof("auth file changed (%s): %s, processing incrementally", event.Op.String(), filepath.Base(event.Name)) + w.addOrUpdateClient(event.Name) + return + } + if !w.isKnownAuthFile(event.Name) { + log.Debugf("ignoring remove for unknown auth file: %s", filepath.Base(event.Name)) + return + } + log.Infof("auth file changed (%s): %s, processing incrementally", event.Op.String(), filepath.Base(event.Name)) + w.removeClient(event.Name) + return + } + if event.Op&(fsnotify.Create|fsnotify.Write) != 0 { + if unchanged, errSame := w.authFileUnchanged(event.Name); errSame == nil && unchanged { + log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(event.Name)) + return + } + log.Infof("auth file changed (%s): %s, processing incrementally", event.Op.String(), filepath.Base(event.Name)) + w.addOrUpdateClient(event.Name) + } +} + +func (w *Watcher) authFileUnchanged(path string) (bool, error) { + data, errRead := os.ReadFile(path) + if errRead != nil { + return false, errRead + } + if len(data) == 0 { + return false, nil + } + sum := sha256.Sum256(data) + curHash := hex.EncodeToString(sum[:]) + + normalized := w.normalizeAuthPath(path) + w.clientsMutex.RLock() + prevHash, ok := w.lastAuthHashes[normalized] + w.clientsMutex.RUnlock() + if ok && prevHash == curHash { + return true, nil + } + return false, nil +} + +func (w *Watcher) isKnownAuthFile(path string) bool { + normalized := w.normalizeAuthPath(path) + w.clientsMutex.RLock() + defer w.clientsMutex.RUnlock() + _, ok := w.lastAuthHashes[normalized] + return ok +} + +func (w *Watcher) normalizeAuthPath(path string) string { + trimmed := strings.TrimSpace(path) + if trimmed == "" { + return "" + } + cleaned := filepath.Clean(trimmed) + if runtime.GOOS == "windows" { + cleaned = strings.TrimPrefix(cleaned, `\\?\`) + cleaned = strings.ToLower(cleaned) + } + return cleaned +} + +func (w *Watcher) shouldDebounceRemove(normalizedPath string, now time.Time) bool { + if normalizedPath == "" { + return false + } + w.clientsMutex.Lock() + if w.lastRemoveTimes == nil { + w.lastRemoveTimes = make(map[string]time.Time) + } + if last, ok := w.lastRemoveTimes[normalizedPath]; ok { + if now.Sub(last) < authRemoveDebounceWindow { + w.clientsMutex.Unlock() + return true + } + } + w.lastRemoveTimes[normalizedPath] = now + if len(w.lastRemoveTimes) > 128 { + cutoff := now.Add(-2 * authRemoveDebounceWindow) + for p, t := range w.lastRemoveTimes { + if t.Before(cutoff) { + delete(w.lastRemoveTimes, p) + } + } + } + w.clientsMutex.Unlock() + return false +} diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 61e5b378..77006cf8 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -1,45 +1,22 @@ -// Package watcher provides file system monitoring functionality for the CLI Proxy API. -// It watches configuration files and authentication directories for changes, -// automatically reloading clients and configuration when files are modified. -// The package handles cross-platform file system events and supports hot-reloading. +// Package watcher watches config/auth files and triggers hot reloads. +// It supports cross-platform fsnotify event handling. package watcher import ( "context" - "crypto/sha256" - "encoding/hex" - "fmt" - "io/fs" - "os" - "path/filepath" - "reflect" - "runtime" "strings" "sync" "time" "github.com/fsnotify/fsnotify" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" - "github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/diff" - "github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/synthesizer" "gopkg.in/yaml.v3" - "github.com/router-for-me/CLIProxyAPI/v6/internal/util" sdkAuth "github.com/router-for-me/CLIProxyAPI/v6/sdk/auth" coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" log "github.com/sirupsen/logrus" ) -func matchProvider(provider string, targets []string) (string, bool) { - p := strings.ToLower(strings.TrimSpace(provider)) - for _, t := range targets { - if strings.EqualFold(p, strings.TrimSpace(t)) { - return p, true - } - } - return p, false -} - // storePersister captures persistence-capable token store methods used by the watcher. type storePersister interface { PersistConfig(ctx context.Context) error @@ -131,26 +108,7 @@ func NewWatcher(configPath, authDir string, reloadCallback func(*config.Config)) // Start begins watching the configuration file and authentication directory func (w *Watcher) Start(ctx context.Context) error { - // Watch the config file - if errAddConfig := w.watcher.Add(w.configPath); errAddConfig != nil { - log.Errorf("failed to watch config file %s: %v", w.configPath, errAddConfig) - return errAddConfig - } - log.Debugf("watching config file: %s", w.configPath) - - // Watch the auth directory - if errAddAuthDir := w.watcher.Add(w.authDir); errAddAuthDir != nil { - log.Errorf("failed to watch auth directory %s: %v", w.authDir, errAddAuthDir) - return errAddAuthDir - } - log.Debugf("watching auth directory: %s", w.authDir) - - // Start the event processing goroutine - go w.processEvents(ctx) - - // Perform an initial full reload based on current config and auth dir - w.reloadClients(true, nil, false) - return nil + return w.start(ctx) } // Stop stops the file watcher @@ -160,15 +118,6 @@ func (w *Watcher) Stop() error { return w.watcher.Close() } -func (w *Watcher) stopConfigReloadTimer() { - w.configReloadMu.Lock() - if w.configReloadTimer != nil { - w.configReloadTimer.Stop() - w.configReloadTimer = nil - } - w.configReloadMu.Unlock() -} - // SetConfig updates the current configuration func (w *Watcher) SetConfig(cfg *config.Config) { w.clientsMutex.Lock() @@ -179,818 +128,20 @@ func (w *Watcher) SetConfig(cfg *config.Config) { // SetAuthUpdateQueue sets the queue used to emit auth updates. func (w *Watcher) SetAuthUpdateQueue(queue chan<- AuthUpdate) { - w.clientsMutex.Lock() - defer w.clientsMutex.Unlock() - w.authQueue = queue - if w.dispatchCond == nil { - w.dispatchCond = sync.NewCond(&w.dispatchMu) - } - if w.dispatchCancel != nil { - w.dispatchCancel() - if w.dispatchCond != nil { - w.dispatchMu.Lock() - w.dispatchCond.Broadcast() - w.dispatchMu.Unlock() - } - w.dispatchCancel = nil - } - if queue != nil { - ctx, cancel := context.WithCancel(context.Background()) - w.dispatchCancel = cancel - go w.dispatchLoop(ctx) - } + w.setAuthUpdateQueue(queue) } // DispatchRuntimeAuthUpdate allows external runtime providers (e.g., websocket-driven auths) // to push auth updates through the same queue used by file/config watchers. // Returns true if the update was enqueued; false if no queue is configured. func (w *Watcher) DispatchRuntimeAuthUpdate(update AuthUpdate) bool { - if w == nil { - return false - } - w.clientsMutex.Lock() - if w.runtimeAuths == nil { - w.runtimeAuths = make(map[string]*coreauth.Auth) - } - switch update.Action { - case AuthUpdateActionAdd, AuthUpdateActionModify: - if update.Auth != nil && update.Auth.ID != "" { - clone := update.Auth.Clone() - w.runtimeAuths[clone.ID] = clone - if w.currentAuths == nil { - w.currentAuths = make(map[string]*coreauth.Auth) - } - w.currentAuths[clone.ID] = clone.Clone() - } - case AuthUpdateActionDelete: - id := update.ID - if id == "" && update.Auth != nil { - id = update.Auth.ID - } - if id != "" { - delete(w.runtimeAuths, id) - if w.currentAuths != nil { - delete(w.currentAuths, id) - } - } - } - w.clientsMutex.Unlock() - if w.getAuthQueue() == nil { - return false - } - w.dispatchAuthUpdates([]AuthUpdate{update}) - return true + return w.dispatchRuntimeAuthUpdate(update) } -func (w *Watcher) refreshAuthState(force bool) { - auths := w.SnapshotCoreAuths() - w.clientsMutex.Lock() - if len(w.runtimeAuths) > 0 { - for _, a := range w.runtimeAuths { - if a != nil { - auths = append(auths, a.Clone()) - } - } - } - updates := w.prepareAuthUpdatesLocked(auths, force) - w.clientsMutex.Unlock() - w.dispatchAuthUpdates(updates) -} - -func (w *Watcher) prepareAuthUpdatesLocked(auths []*coreauth.Auth, force bool) []AuthUpdate { - newState := make(map[string]*coreauth.Auth, len(auths)) - for _, auth := range auths { - if auth == nil || auth.ID == "" { - continue - } - newState[auth.ID] = auth.Clone() - } - if w.currentAuths == nil { - w.currentAuths = newState - if w.authQueue == nil { - return nil - } - updates := make([]AuthUpdate, 0, len(newState)) - for id, auth := range newState { - updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: auth.Clone()}) - } - return updates - } - if w.authQueue == nil { - w.currentAuths = newState - return nil - } - updates := make([]AuthUpdate, 0, len(newState)+len(w.currentAuths)) - for id, auth := range newState { - if existing, ok := w.currentAuths[id]; !ok { - updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: auth.Clone()}) - } else if force || !authEqual(existing, auth) { - updates = append(updates, AuthUpdate{Action: AuthUpdateActionModify, ID: id, Auth: auth.Clone()}) - } - } - for id := range w.currentAuths { - if _, ok := newState[id]; !ok { - updates = append(updates, AuthUpdate{Action: AuthUpdateActionDelete, ID: id}) - } - } - w.currentAuths = newState - return updates -} - -func (w *Watcher) dispatchAuthUpdates(updates []AuthUpdate) { - if len(updates) == 0 { - return - } - queue := w.getAuthQueue() - if queue == nil { - return - } - baseTS := time.Now().UnixNano() - w.dispatchMu.Lock() - if w.pendingUpdates == nil { - w.pendingUpdates = make(map[string]AuthUpdate) - } - for idx, update := range updates { - key := w.authUpdateKey(update, baseTS+int64(idx)) - if _, exists := w.pendingUpdates[key]; !exists { - w.pendingOrder = append(w.pendingOrder, key) - } - w.pendingUpdates[key] = update - } - if w.dispatchCond != nil { - w.dispatchCond.Signal() - } - w.dispatchMu.Unlock() -} - -func (w *Watcher) authUpdateKey(update AuthUpdate, ts int64) string { - if update.ID != "" { - return update.ID - } - return fmt.Sprintf("%s:%d", update.Action, ts) -} - -func (w *Watcher) dispatchLoop(ctx context.Context) { - for { - batch, ok := w.nextPendingBatch(ctx) - if !ok { - return - } - queue := w.getAuthQueue() - if queue == nil { - if ctx.Err() != nil { - return - } - time.Sleep(10 * time.Millisecond) - continue - } - for _, update := range batch { - select { - case queue <- update: - case <-ctx.Done(): - return - } - } - } -} - -func (w *Watcher) nextPendingBatch(ctx context.Context) ([]AuthUpdate, bool) { - w.dispatchMu.Lock() - defer w.dispatchMu.Unlock() - for len(w.pendingOrder) == 0 { - if ctx.Err() != nil { - return nil, false - } - w.dispatchCond.Wait() - if ctx.Err() != nil { - return nil, false - } - } - batch := make([]AuthUpdate, 0, len(w.pendingOrder)) - for _, key := range w.pendingOrder { - batch = append(batch, w.pendingUpdates[key]) - delete(w.pendingUpdates, key) - } - w.pendingOrder = w.pendingOrder[:0] - return batch, true -} - -func (w *Watcher) getAuthQueue() chan<- AuthUpdate { - w.clientsMutex.RLock() - defer w.clientsMutex.RUnlock() - return w.authQueue -} - -func (w *Watcher) stopDispatch() { - if w.dispatchCancel != nil { - w.dispatchCancel() - w.dispatchCancel = nil - } - w.dispatchMu.Lock() - w.pendingOrder = nil - w.pendingUpdates = nil - if w.dispatchCond != nil { - w.dispatchCond.Broadcast() - } - w.dispatchMu.Unlock() - w.clientsMutex.Lock() - w.authQueue = nil - w.clientsMutex.Unlock() -} - -func (w *Watcher) persistConfigAsync() { - if w == nil || w.storePersister == nil { - return - } - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - if err := w.storePersister.PersistConfig(ctx); err != nil { - log.Errorf("failed to persist config change: %v", err) - } - }() -} - -func (w *Watcher) persistAuthAsync(message string, paths ...string) { - if w == nil || w.storePersister == nil { - return - } - filtered := make([]string, 0, len(paths)) - for _, p := range paths { - if trimmed := strings.TrimSpace(p); trimmed != "" { - filtered = append(filtered, trimmed) - } - } - if len(filtered) == 0 { - return - } - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - if err := w.storePersister.PersistAuthFiles(ctx, message, filtered...); err != nil { - log.Errorf("failed to persist auth changes: %v", err) - } - }() -} - -func authEqual(a, b *coreauth.Auth) bool { - return reflect.DeepEqual(normalizeAuth(a), normalizeAuth(b)) -} - -func normalizeAuth(a *coreauth.Auth) *coreauth.Auth { - if a == nil { - return nil - } - clone := a.Clone() - clone.CreatedAt = time.Time{} - clone.UpdatedAt = time.Time{} - clone.LastRefreshedAt = time.Time{} - clone.NextRefreshAfter = time.Time{} - clone.Runtime = nil - clone.Quota.NextRecoverAt = time.Time{} - return clone -} - -// SetClients sets the file-based clients. -// SetClients removed -// SetAPIKeyClients removed - -// processEvents handles file system events -func (w *Watcher) processEvents(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case event, ok := <-w.watcher.Events: - if !ok { - return - } - w.handleEvent(event) - case errWatch, ok := <-w.watcher.Errors: - if !ok { - return - } - log.Errorf("file watcher error: %v", errWatch) - } - } -} - -func (w *Watcher) authFileUnchanged(path string) (bool, error) { - data, errRead := os.ReadFile(path) - if errRead != nil { - return false, errRead - } - if len(data) == 0 { - return false, nil - } - sum := sha256.Sum256(data) - curHash := hex.EncodeToString(sum[:]) - - normalized := w.normalizeAuthPath(path) - w.clientsMutex.RLock() - prevHash, ok := w.lastAuthHashes[normalized] - w.clientsMutex.RUnlock() - if ok && prevHash == curHash { - return true, nil - } - return false, nil -} - -func (w *Watcher) isKnownAuthFile(path string) bool { - normalized := w.normalizeAuthPath(path) - w.clientsMutex.RLock() - defer w.clientsMutex.RUnlock() - _, ok := w.lastAuthHashes[normalized] - return ok -} - -func (w *Watcher) normalizeAuthPath(path string) string { - trimmed := strings.TrimSpace(path) - if trimmed == "" { - return "" - } - cleaned := filepath.Clean(trimmed) - if runtime.GOOS == "windows" { - cleaned = strings.TrimPrefix(cleaned, `\\?\`) - cleaned = strings.ToLower(cleaned) - } - return cleaned -} - -func (w *Watcher) shouldDebounceRemove(normalizedPath string, now time.Time) bool { - if normalizedPath == "" { - return false - } - w.clientsMutex.Lock() - if w.lastRemoveTimes == nil { - w.lastRemoveTimes = make(map[string]time.Time) - } - if last, ok := w.lastRemoveTimes[normalizedPath]; ok { - if now.Sub(last) < authRemoveDebounceWindow { - w.clientsMutex.Unlock() - return true - } - } - w.lastRemoveTimes[normalizedPath] = now - if len(w.lastRemoveTimes) > 128 { - cutoff := now.Add(-2 * authRemoveDebounceWindow) - for p, t := range w.lastRemoveTimes { - if t.Before(cutoff) { - delete(w.lastRemoveTimes, p) - } - } - } - w.clientsMutex.Unlock() - return false -} - -// handleEvent processes individual file system events -func (w *Watcher) handleEvent(event fsnotify.Event) { - // Filter only relevant events: config file or auth-dir JSON files. - configOps := fsnotify.Write | fsnotify.Create | fsnotify.Rename - normalizedName := w.normalizeAuthPath(event.Name) - normalizedConfigPath := w.normalizeAuthPath(w.configPath) - normalizedAuthDir := w.normalizeAuthPath(w.authDir) - isConfigEvent := normalizedName == normalizedConfigPath && event.Op&configOps != 0 - authOps := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename - isAuthJSON := strings.HasPrefix(normalizedName, normalizedAuthDir) && strings.HasSuffix(normalizedName, ".json") && event.Op&authOps != 0 - if !isConfigEvent && !isAuthJSON { - // Ignore unrelated files (e.g., cookie snapshots *.cookie) and other noise. - return - } - - now := time.Now() - log.Debugf("file system event detected: %s %s", event.Op.String(), event.Name) - - // Handle config file changes - if isConfigEvent { - log.Debugf("config file change details - operation: %s, timestamp: %s", event.Op.String(), now.Format("2006-01-02 15:04:05.000")) - w.scheduleConfigReload() - return - } - - // Handle auth directory changes incrementally (.json only) - if event.Op&(fsnotify.Remove|fsnotify.Rename) != 0 { - if w.shouldDebounceRemove(normalizedName, now) { - log.Debugf("debouncing remove event for %s", filepath.Base(event.Name)) - return - } - // Atomic replace on some platforms may surface as Rename (or Remove) before the new file is ready. - // Wait briefly; if the path exists again, treat as an update instead of removal. - time.Sleep(replaceCheckDelay) - if _, statErr := os.Stat(event.Name); statErr == nil { - if unchanged, errSame := w.authFileUnchanged(event.Name); errSame == nil && unchanged { - log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(event.Name)) - return - } - log.Infof("auth file changed (%s): %s, processing incrementally", event.Op.String(), filepath.Base(event.Name)) - w.addOrUpdateClient(event.Name) - return - } - if !w.isKnownAuthFile(event.Name) { - log.Debugf("ignoring remove for unknown auth file: %s", filepath.Base(event.Name)) - return - } - log.Infof("auth file changed (%s): %s, processing incrementally", event.Op.String(), filepath.Base(event.Name)) - w.removeClient(event.Name) - return - } - if event.Op&(fsnotify.Create|fsnotify.Write) != 0 { - if unchanged, errSame := w.authFileUnchanged(event.Name); errSame == nil && unchanged { - log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(event.Name)) - return - } - log.Infof("auth file changed (%s): %s, processing incrementally", event.Op.String(), filepath.Base(event.Name)) - w.addOrUpdateClient(event.Name) - } -} - -func (w *Watcher) scheduleConfigReload() { - w.configReloadMu.Lock() - defer w.configReloadMu.Unlock() - if w.configReloadTimer != nil { - w.configReloadTimer.Stop() - } - w.configReloadTimer = time.AfterFunc(configReloadDebounce, func() { - w.configReloadMu.Lock() - w.configReloadTimer = nil - w.configReloadMu.Unlock() - w.reloadConfigIfChanged() - }) -} - -func (w *Watcher) reloadConfigIfChanged() { - data, err := os.ReadFile(w.configPath) - if err != nil { - log.Errorf("failed to read config file for hash check: %v", err) - return - } - if len(data) == 0 { - log.Debugf("ignoring empty config file write event") - return - } - sum := sha256.Sum256(data) - newHash := hex.EncodeToString(sum[:]) - - w.clientsMutex.RLock() - currentHash := w.lastConfigHash - w.clientsMutex.RUnlock() - - if currentHash != "" && currentHash == newHash { - log.Debugf("config file content unchanged (hash match), skipping reload") - return - } - log.Infof("config file changed, reloading: %s", w.configPath) - if w.reloadConfig() { - finalHash := newHash - if updatedData, errRead := os.ReadFile(w.configPath); errRead == nil && len(updatedData) > 0 { - sumUpdated := sha256.Sum256(updatedData) - finalHash = hex.EncodeToString(sumUpdated[:]) - } else if errRead != nil { - log.WithError(errRead).Debug("failed to compute updated config hash after reload") - } - w.clientsMutex.Lock() - w.lastConfigHash = finalHash - w.clientsMutex.Unlock() - w.persistConfigAsync() - } -} - -// reloadConfig reloads the configuration and triggers a full reload -func (w *Watcher) reloadConfig() bool { - log.Debug("=========================== CONFIG RELOAD ============================") - log.Debugf("starting config reload from: %s", w.configPath) - - newConfig, errLoadConfig := config.LoadConfig(w.configPath) - if errLoadConfig != nil { - log.Errorf("failed to reload config: %v", errLoadConfig) - return false - } - - if w.mirroredAuthDir != "" { - newConfig.AuthDir = w.mirroredAuthDir - } else { - if resolvedAuthDir, errResolveAuthDir := util.ResolveAuthDir(newConfig.AuthDir); errResolveAuthDir != nil { - log.Errorf("failed to resolve auth directory from config: %v", errResolveAuthDir) - } else { - newConfig.AuthDir = resolvedAuthDir - } - } - - w.clientsMutex.Lock() - var oldConfig *config.Config - _ = yaml.Unmarshal(w.oldConfigYaml, &oldConfig) - w.oldConfigYaml, _ = yaml.Marshal(newConfig) - w.config = newConfig - w.clientsMutex.Unlock() - - var affectedOAuthProviders []string - if oldConfig != nil { - _, affectedOAuthProviders = diff.DiffOAuthExcludedModelChanges(oldConfig.OAuthExcludedModels, newConfig.OAuthExcludedModels) - } - - // Always apply the current log level based on the latest config. - // This ensures logrus reflects the desired level even if change detection misses. - util.SetLogLevel(newConfig) - // Additional debug for visibility when the flag actually changes. - if oldConfig != nil && oldConfig.Debug != newConfig.Debug { - log.Debugf("log level updated - debug mode changed from %t to %t", oldConfig.Debug, newConfig.Debug) - } - - // Log configuration changes in debug mode, only when there are material diffs - if oldConfig != nil { - details := diff.BuildConfigChangeDetails(oldConfig, newConfig) - if len(details) > 0 { - log.Debugf("config changes detected:") - for _, d := range details { - log.Debugf(" %s", d) - } - } else { - log.Debugf("no material config field changes detected") - } - } - - authDirChanged := oldConfig == nil || oldConfig.AuthDir != newConfig.AuthDir - forceAuthRefresh := oldConfig != nil && oldConfig.ForceModelPrefix != newConfig.ForceModelPrefix - - log.Infof("config successfully reloaded, triggering client reload") - // Reload clients with new config - w.reloadClients(authDirChanged, affectedOAuthProviders, forceAuthRefresh) - return true -} - -// reloadClients performs a full scan and reload of all clients. -func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string, forceAuthRefresh bool) { - log.Debugf("starting full client load process") - - w.clientsMutex.RLock() - cfg := w.config - w.clientsMutex.RUnlock() - - if cfg == nil { - log.Error("config is nil, cannot reload clients") - return - } - - if len(affectedOAuthProviders) > 0 { - w.clientsMutex.Lock() - if w.currentAuths != nil { - filtered := make(map[string]*coreauth.Auth, len(w.currentAuths)) - for id, auth := range w.currentAuths { - if auth == nil { - continue - } - provider := strings.ToLower(strings.TrimSpace(auth.Provider)) - if _, match := matchProvider(provider, affectedOAuthProviders); match { - continue - } - filtered[id] = auth - } - w.currentAuths = filtered - log.Debugf("applying oauth-excluded-models to providers %v", affectedOAuthProviders) - } else { - w.currentAuths = nil - } - w.clientsMutex.Unlock() - } - - // Unregister all old API key clients before creating new ones - // no legacy clients to unregister - - // Create new API key clients based on the new config - geminiAPIKeyCount, vertexCompatAPIKeyCount, claudeAPIKeyCount, codexAPIKeyCount, openAICompatCount := BuildAPIKeyClients(cfg) - totalAPIKeyClients := geminiAPIKeyCount + vertexCompatAPIKeyCount + claudeAPIKeyCount + codexAPIKeyCount + openAICompatCount - log.Debugf("loaded %d API key clients", totalAPIKeyClients) - - var authFileCount int - if rescanAuth { - // Load file-based clients when explicitly requested (startup or authDir change) - authFileCount = w.loadFileClients(cfg) - log.Debugf("loaded %d file-based clients", authFileCount) - } else { - // Preserve existing auth hashes and only report current known count to avoid redundant scans. - w.clientsMutex.RLock() - authFileCount = len(w.lastAuthHashes) - w.clientsMutex.RUnlock() - log.Debugf("skipping auth directory rescan; retaining %d existing auth files", authFileCount) - } - - // no legacy file-based clients to unregister - - // Update client maps - if rescanAuth { - w.clientsMutex.Lock() - - // Rebuild auth file hash cache for current clients - w.lastAuthHashes = make(map[string]string) - if resolvedAuthDir, errResolveAuthDir := util.ResolveAuthDir(cfg.AuthDir); errResolveAuthDir != nil { - log.Errorf("failed to resolve auth directory for hash cache: %v", errResolveAuthDir) - } else if resolvedAuthDir != "" { - _ = filepath.Walk(resolvedAuthDir, func(path string, info fs.FileInfo, err error) error { - if err != nil { - return nil - } - if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".json") { - if data, errReadFile := os.ReadFile(path); errReadFile == nil && len(data) > 0 { - sum := sha256.Sum256(data) - normalizedPath := w.normalizeAuthPath(path) - w.lastAuthHashes[normalizedPath] = hex.EncodeToString(sum[:]) - } - } - return nil - }) - } - w.clientsMutex.Unlock() - } - - totalNewClients := authFileCount + geminiAPIKeyCount + vertexCompatAPIKeyCount + claudeAPIKeyCount + codexAPIKeyCount + openAICompatCount - - // Ensure consumers observe the new configuration before auth updates dispatch. - if w.reloadCallback != nil { - log.Debugf("triggering server update callback before auth refresh") - w.reloadCallback(cfg) - } - - w.refreshAuthState(forceAuthRefresh) - - log.Infof("full client load complete - %d clients (%d auth files + %d Gemini API keys + %d Vertex API keys + %d Claude API keys + %d Codex keys + %d OpenAI-compat)", - totalNewClients, - authFileCount, - geminiAPIKeyCount, - vertexCompatAPIKeyCount, - claudeAPIKeyCount, - codexAPIKeyCount, - openAICompatCount, - ) -} - -// createClientFromFile creates a single client instance from a given token file path. -// createClientFromFile removed (legacy) - -// addOrUpdateClient handles the addition or update of a single client. -func (w *Watcher) addOrUpdateClient(path string) { - data, errRead := os.ReadFile(path) - if errRead != nil { - log.Errorf("failed to read auth file %s: %v", filepath.Base(path), errRead) - return - } - if len(data) == 0 { - log.Debugf("ignoring empty auth file: %s", filepath.Base(path)) - return - } - - sum := sha256.Sum256(data) - curHash := hex.EncodeToString(sum[:]) - normalized := w.normalizeAuthPath(path) - - w.clientsMutex.Lock() - - cfg := w.config - if cfg == nil { - log.Error("config is nil, cannot add or update client") - w.clientsMutex.Unlock() - return - } - if prev, ok := w.lastAuthHashes[normalized]; ok && prev == curHash { - log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(path)) - w.clientsMutex.Unlock() - return - } - - // Update hash cache - w.lastAuthHashes[normalized] = curHash - - w.clientsMutex.Unlock() // Unlock before the callback - - w.refreshAuthState(false) - - if w.reloadCallback != nil { - log.Debugf("triggering server update callback after add/update") - w.reloadCallback(cfg) - } - w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path) -} - -// removeClient handles the removal of a single client. -func (w *Watcher) removeClient(path string) { - normalized := w.normalizeAuthPath(path) - w.clientsMutex.Lock() - - cfg := w.config - delete(w.lastAuthHashes, normalized) - - w.clientsMutex.Unlock() // Release the lock before the callback - - w.refreshAuthState(false) - - if w.reloadCallback != nil { - log.Debugf("triggering server update callback after removal") - w.reloadCallback(cfg) - } - w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path) -} - -// SnapshotCombinedClients returns a snapshot of current combined clients. -// SnapshotCombinedClients removed - // SnapshotCoreAuths converts current clients snapshot into core auth entries. func (w *Watcher) SnapshotCoreAuths() []*coreauth.Auth { w.clientsMutex.RLock() cfg := w.config w.clientsMutex.RUnlock() - - ctx := &synthesizer.SynthesisContext{ - Config: cfg, - AuthDir: w.authDir, - Now: time.Now(), - IDGenerator: synthesizer.NewStableIDGenerator(), - } - - var out []*coreauth.Auth - - // Use ConfigSynthesizer for API key auth entries - configSynth := synthesizer.NewConfigSynthesizer() - if auths, err := configSynth.Synthesize(ctx); err == nil { - out = append(out, auths...) - } - - // Use FileSynthesizer for file-based OAuth auth entries - fileSynth := synthesizer.NewFileSynthesizer() - if auths, err := fileSynth.Synthesize(ctx); err == nil { - out = append(out, auths...) - } - - return out -} - -// buildCombinedClientMap merges file-based clients with API key clients from the cache. -// buildCombinedClientMap removed - -// unregisterClientWithReason attempts to call client-specific unregister hooks with context. -// unregisterClientWithReason removed - -// loadFileClients scans the auth directory and creates clients from .json files. -func (w *Watcher) loadFileClients(cfg *config.Config) int { - authFileCount := 0 - successfulAuthCount := 0 - - authDir, errResolveAuthDir := util.ResolveAuthDir(cfg.AuthDir) - if errResolveAuthDir != nil { - log.Errorf("failed to resolve auth directory: %v", errResolveAuthDir) - return 0 - } - if authDir == "" { - return 0 - } - - errWalk := filepath.Walk(authDir, func(path string, info fs.FileInfo, err error) error { - if err != nil { - log.Debugf("error accessing path %s: %v", path, err) - return err - } - if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".json") { - authFileCount++ - log.Debugf("processing auth file %d: %s", authFileCount, filepath.Base(path)) - // Count readable JSON files as successful auth entries - if data, errCreate := os.ReadFile(path); errCreate == nil && len(data) > 0 { - successfulAuthCount++ - } - } - return nil - }) - - if errWalk != nil { - log.Errorf("error walking auth directory: %v", errWalk) - } - log.Debugf("auth directory scan complete - found %d .json files, %d readable", authFileCount, successfulAuthCount) - return authFileCount -} - -func BuildAPIKeyClients(cfg *config.Config) (int, int, int, int, int) { - geminiAPIKeyCount := 0 - vertexCompatAPIKeyCount := 0 - claudeAPIKeyCount := 0 - codexAPIKeyCount := 0 - openAICompatCount := 0 - - if len(cfg.GeminiKey) > 0 { - // Stateless executor handles Gemini API keys; avoid constructing legacy clients. - geminiAPIKeyCount += len(cfg.GeminiKey) - } - if len(cfg.VertexCompatAPIKey) > 0 { - vertexCompatAPIKeyCount += len(cfg.VertexCompatAPIKey) - } - if len(cfg.ClaudeKey) > 0 { - claudeAPIKeyCount += len(cfg.ClaudeKey) - } - if len(cfg.CodexKey) > 0 { - codexAPIKeyCount += len(cfg.CodexKey) - } - if len(cfg.OpenAICompatibility) > 0 { - // Do not construct legacy clients for OpenAI-compat providers; these are handled by the stateless executor. - for _, compatConfig := range cfg.OpenAICompatibility { - openAICompatCount += len(compatConfig.APIKeyEntries) - } - } - return geminiAPIKeyCount, vertexCompatAPIKeyCount, claudeAPIKeyCount, codexAPIKeyCount, openAICompatCount + return snapshotCoreAuths(cfg, w.authDir) }