diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 6f00b27b..e2311171 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -13,6 +13,7 @@ import ( "io/fs" "os" "path/filepath" + "reflect" "strings" "sync" "time" @@ -42,6 +43,24 @@ type Watcher struct { watcher *fsnotify.Watcher lastAuthHashes map[string]string lastConfigHash string + authQueue chan<- AuthUpdate + currentAuths map[string]*coreauth.Auth +} + +// AuthUpdateAction represents the type of change detected in auth sources. +type AuthUpdateAction string + +const ( + AuthUpdateActionAdd AuthUpdateAction = "add" + AuthUpdateActionModify AuthUpdateAction = "modify" + AuthUpdateActionDelete AuthUpdateAction = "delete" +) + +// AuthUpdate describes an incremental change to auth configuration. +type AuthUpdate struct { + Action AuthUpdateAction + ID string + Auth *coreauth.Auth } const ( @@ -104,6 +123,88 @@ func (w *Watcher) SetConfig(cfg *config.Config) { w.config = cfg } +// SetAuthUpdateQueue sets the queue used to emit auth updates. +func (w *Watcher) SetAuthUpdateQueue(queue chan<- AuthUpdate) { + w.clientsMutex.Lock() + defer w.clientsMutex.Unlock() + w.authQueue = queue +} + +func (w *Watcher) refreshAuthState() { + auths := w.SnapshotCoreAuths() + w.clientsMutex.Lock() + updates := w.prepareAuthUpdatesLocked(auths) + w.clientsMutex.Unlock() + w.dispatchAuthUpdates(updates) +} + +func (w *Watcher) prepareAuthUpdatesLocked(auths []*coreauth.Auth) []AuthUpdate { + newState := make(map[string]*coreauth.Auth, len(auths)) + for _, auth := range auths { + if auth == nil || auth.ID == "" { + continue + } + newState[auth.ID] = auth.Clone() + } + if w.currentAuths == nil { + w.currentAuths = newState + if w.authQueue == nil { + return nil + } + updates := make([]AuthUpdate, 0, len(newState)) + for id, auth := range newState { + updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: auth.Clone()}) + } + return updates + } + if w.authQueue == nil { + w.currentAuths = newState + return nil + } + updates := make([]AuthUpdate, 0, len(newState)+len(w.currentAuths)) + for id, auth := range newState { + if existing, ok := w.currentAuths[id]; !ok { + updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: auth.Clone()}) + } else if !authEqual(existing, auth) { + updates = append(updates, AuthUpdate{Action: AuthUpdateActionModify, ID: id, Auth: auth.Clone()}) + } + } + for id := range w.currentAuths { + if _, ok := newState[id]; !ok { + updates = append(updates, AuthUpdate{Action: AuthUpdateActionDelete, ID: id}) + } + } + w.currentAuths = newState + return updates +} + +func (w *Watcher) dispatchAuthUpdates(updates []AuthUpdate) { + if len(updates) == 0 || w.authQueue == nil { + return + } + for _, update := range updates { + w.authQueue <- update + } +} + +func authEqual(a, b *coreauth.Auth) bool { + return reflect.DeepEqual(normalizeAuth(a), normalizeAuth(b)) +} + +func normalizeAuth(a *coreauth.Auth) *coreauth.Auth { + if a == nil { + return nil + } + clone := a.Clone() + clone.CreatedAt = time.Time{} + clone.UpdatedAt = time.Time{} + clone.LastRefreshedAt = time.Time{} + clone.NextRefreshAfter = time.Time{} + clone.Runtime = nil + clone.Quota.NextRecoverAt = time.Time{} + return clone +} + // SetClients sets the file-based clients. // SetClients removed // SetAPIKeyClients removed @@ -326,6 +427,8 @@ func (w *Watcher) reloadClients() { totalNewClients := authFileCount + glAPIKeyCount + claudeAPIKeyCount + codexAPIKeyCount + openAICompatCount + w.refreshAuthState() + log.Infof("full client reload complete - old: %d clients, new: %d clients (%d auth files + %d GL API keys + %d Claude API keys + %d Codex keys + %d OpenAI-compat)", 0, totalNewClients, @@ -380,6 +483,8 @@ func (w *Watcher) addOrUpdateClient(path string) { w.clientsMutex.Unlock() // Unlock before the callback + w.refreshAuthState() + if w.reloadCallback != nil { log.Debugf("triggering server update callback after add/update") w.reloadCallback(cfg) @@ -395,6 +500,8 @@ func (w *Watcher) removeClient(path string) { w.clientsMutex.Unlock() // Release the lock before the callback + w.refreshAuthState() + if w.reloadCallback != nil { log.Debugf("triggering server update callback after removal") w.reloadCallback(cfg) diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index 80990122..ee7fac99 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -15,6 +15,7 @@ import ( "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" "github.com/router-for-me/CLIProxyAPI/v6/internal/runtime/executor" "github.com/router-for-me/CLIProxyAPI/v6/internal/util" + "github.com/router-for-me/CLIProxyAPI/v6/internal/watcher" sdkaccess "github.com/router-for-me/CLIProxyAPI/v6/sdk/access" _ "github.com/router-for-me/CLIProxyAPI/v6/sdk/access/providers/configapikey" sdkAuth "github.com/router-for-me/CLIProxyAPI/v6/sdk/auth" @@ -39,6 +40,8 @@ type Service struct { watcher *WatcherWrapper watcherCancel context.CancelFunc + authUpdates chan watcher.AuthUpdate + authQueueStop context.CancelFunc // legacy client caches removed authManager *sdkAuth.Manager @@ -70,6 +73,132 @@ func (s *Service) refreshAccessProviders(cfg *config.Config) { s.accessManager.SetProviders(providers) } +func (s *Service) ensureAuthUpdateQueue(ctx context.Context) { + if s == nil { + return + } + if s.authUpdates == nil { + s.authUpdates = make(chan watcher.AuthUpdate, 64) + } + if s.authQueueStop != nil { + return + } + queueCtx, cancel := context.WithCancel(ctx) + s.authQueueStop = cancel + go s.consumeAuthUpdates(queueCtx) +} + +func (s *Service) consumeAuthUpdates(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case update, ok := <-s.authUpdates: + if !ok { + return + } + s.handleAuthUpdate(ctx, update) + } + } +} + +func (s *Service) handleAuthUpdate(ctx context.Context, update watcher.AuthUpdate) { + if s == nil { + return + } + s.cfgMu.RLock() + cfg := s.cfg + s.cfgMu.RUnlock() + if cfg == nil || s.coreManager == nil { + return + } + switch update.Action { + case watcher.AuthUpdateActionAdd, watcher.AuthUpdateActionModify: + if update.Auth == nil || update.Auth.ID == "" { + return + } + s.applyCoreAuthAddOrUpdate(ctx, update.Auth) + case watcher.AuthUpdateActionDelete: + id := update.ID + if id == "" && update.Auth != nil { + id = update.Auth.ID + } + if id == "" { + return + } + s.applyCoreAuthRemoval(ctx, id) + default: + log.Debugf("received unknown auth update action: %v", update.Action) + } +} + +func (s *Service) applyCoreAuthAddOrUpdate(ctx context.Context, auth *coreauth.Auth) { + if s == nil || auth == nil || auth.ID == "" { + return + } + if s.coreManager == nil { + return + } + auth = auth.Clone() + s.ensureExecutorsForAuth(auth) + s.registerModelsForAuth(auth) + if existing, ok := s.coreManager.GetByID(auth.ID); ok && existing != nil { + auth.CreatedAt = existing.CreatedAt + auth.LastRefreshedAt = existing.LastRefreshedAt + auth.NextRefreshAfter = existing.NextRefreshAfter + if _, err := s.coreManager.Update(ctx, auth); err != nil { + log.Errorf("failed to update auth %s: %v", auth.ID, err) + } + return + } + if _, err := s.coreManager.Register(ctx, auth); err != nil { + log.Errorf("failed to register auth %s: %v", auth.ID, err) + } +} + +func (s *Service) applyCoreAuthRemoval(ctx context.Context, id string) { + if s == nil || id == "" { + return + } + if s.coreManager == nil { + return + } + GlobalModelRegistry().UnregisterClient(id) + if existing, ok := s.coreManager.GetByID(id); ok && existing != nil { + existing.Disabled = true + existing.Status = coreauth.StatusDisabled + if _, err := s.coreManager.Update(ctx, existing); err != nil { + log.Errorf("failed to disable auth %s: %v", id, err) + } + } +} + +func (s *Service) ensureExecutorsForAuth(a *coreauth.Auth) { + if s == nil || a == nil { + return + } + switch strings.ToLower(a.Provider) { + case "gemini": + s.coreManager.RegisterExecutor(executor.NewGeminiExecutor(s.cfg)) + case "gemini-cli": + s.coreManager.RegisterExecutor(executor.NewGeminiCLIExecutor(s.cfg)) + case "gemini-web": + s.coreManager.RegisterExecutor(executor.NewGeminiWebExecutor(s.cfg)) + case "claude": + s.coreManager.RegisterExecutor(executor.NewClaudeExecutor(s.cfg)) + case "codex": + s.coreManager.RegisterExecutor(executor.NewCodexExecutor(s.cfg)) + case "qwen": + s.coreManager.RegisterExecutor(executor.NewQwenExecutor(s.cfg)) + default: + providerKey := strings.ToLower(strings.TrimSpace(a.Provider)) + if providerKey == "" { + providerKey = "openai-compatibility" + } + s.coreManager.RegisterExecutor(executor.NewOpenAICompatExecutor(providerKey, s.cfg)) + } +} + // Run starts the service and blocks until the context is cancelled or the server stops. func (s *Service) Run(ctx context.Context) error { if s == nil { @@ -150,15 +279,13 @@ func (s *Service) Run(ctx context.Context) error { newCfg = s.cfg s.cfgMu.RUnlock() } - - // Pull the latest auth snapshot and sync - auths := watcherWrapper.SnapshotAuths() - s.syncCoreAuthFromAuths(ctx, auths) + if newCfg == nil { + return + } s.refreshAccessProviders(newCfg) if s.server != nil { s.server.UpdateClients(newCfg) } - s.cfgMu.Lock() s.cfg = newCfg s.cfgMu.Unlock() @@ -170,6 +297,10 @@ func (s *Service) Run(ctx context.Context) error { return fmt.Errorf("cliproxy: failed to create watcher: %w", err) } s.watcher = watcherWrapper + s.ensureAuthUpdateQueue(ctx) + if s.authUpdates != nil { + watcherWrapper.SetAuthUpdateQueue(s.authUpdates) + } watcherWrapper.SetConfig(s.cfg) watcherCtx, watcherCancel := context.WithCancel(context.Background()) @@ -234,6 +365,10 @@ func (s *Service) Shutdown(ctx context.Context) error { shutdownErr = err } } + if s.authQueueStop != nil { + s.authQueueStop() + s.authQueueStop = nil + } // no legacy clients to persist @@ -280,41 +415,7 @@ func (s *Service) syncCoreAuthFromAuths(ctx context.Context, auths []*coreauth.A continue } seen[a.ID] = struct{}{} - // Ensure executors registered per provider: prefer stateless where available. - switch strings.ToLower(a.Provider) { - case "gemini": - s.coreManager.RegisterExecutor(executor.NewGeminiExecutor(s.cfg)) - case "gemini-cli": - s.coreManager.RegisterExecutor(executor.NewGeminiCLIExecutor(s.cfg)) - case "gemini-web": - s.coreManager.RegisterExecutor(executor.NewGeminiWebExecutor(s.cfg)) - case "claude": - s.coreManager.RegisterExecutor(executor.NewClaudeExecutor(s.cfg)) - case "codex": - s.coreManager.RegisterExecutor(executor.NewCodexExecutor(s.cfg)) - case "qwen": - s.coreManager.RegisterExecutor(executor.NewQwenExecutor(s.cfg)) - default: - providerKey := strings.ToLower(strings.TrimSpace(a.Provider)) - if providerKey == "" { - providerKey = "openai-compatibility" - } - s.coreManager.RegisterExecutor(executor.NewOpenAICompatExecutor(providerKey, s.cfg)) - } - - // Preserve existing temporal fields - if existing, ok := s.coreManager.GetByID(a.ID); ok && existing != nil { - a.CreatedAt = existing.CreatedAt - a.LastRefreshedAt = existing.LastRefreshedAt - a.NextRefreshAfter = existing.NextRefreshAfter - } - // Ensure model registry reflects core auth identity - s.registerModelsForAuth(a) - if _, ok := s.coreManager.GetByID(a.ID); ok { - _, _ = s.coreManager.Update(ctx, a) - } else { - _, _ = s.coreManager.Register(ctx, a) - } + s.applyCoreAuthAddOrUpdate(ctx, a) } // Disable removed auths for _, stored := range s.coreManager.List() { @@ -324,11 +425,7 @@ func (s *Service) syncCoreAuthFromAuths(ctx context.Context, auths []*coreauth.A if _, ok := seen[stored.ID]; ok { continue } - stored.Disabled = true - stored.Status = coreauth.StatusDisabled - // Unregister from model registry when disabled - GlobalModelRegistry().UnregisterClient(stored.ID) - _, _ = s.coreManager.Update(ctx, stored) + s.applyCoreAuthRemoval(ctx, stored.ID) } } diff --git a/sdk/cliproxy/types.go b/sdk/cliproxy/types.go index b94516bb..1bc0d3fa 100644 --- a/sdk/cliproxy/types.go +++ b/sdk/cliproxy/types.go @@ -4,6 +4,7 @@ import ( "context" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" + "github.com/router-for-me/CLIProxyAPI/v6/internal/watcher" coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" ) @@ -39,8 +40,9 @@ type WatcherWrapper struct { start func(ctx context.Context) error stop func() error - setConfig func(cfg *config.Config) - snapshotAuths func() []*coreauth.Auth + setConfig func(cfg *config.Config) + snapshotAuths func() []*coreauth.Auth + setUpdateQueue func(queue chan<- watcher.AuthUpdate) } // Start proxies to the underlying watcher Start implementation. @@ -80,3 +82,11 @@ func (w *WatcherWrapper) SnapshotAuths() []*coreauth.Auth { } return w.snapshotAuths() } + +// SetAuthUpdateQueue registers the channel used to propagate auth updates. +func (w *WatcherWrapper) SetAuthUpdateQueue(queue chan<- watcher.AuthUpdate) { + if w == nil || w.setUpdateQueue == nil { + return + } + w.setUpdateQueue(queue) +} diff --git a/sdk/cliproxy/watcher.go b/sdk/cliproxy/watcher.go index b9f7e6a2..81e4c18a 100644 --- a/sdk/cliproxy/watcher.go +++ b/sdk/cliproxy/watcher.go @@ -25,5 +25,8 @@ func defaultWatcherFactory(configPath, authDir string, reload func(*config.Confi w.SetConfig(cfg) }, snapshotAuths: func() []*coreauth.Auth { return w.SnapshotCoreAuths() }, + setUpdateQueue: func(queue chan<- watcher.AuthUpdate) { + w.SetAuthUpdateQueue(queue) + }, }, nil }