feat(auth): implement incremental auth updates with queue integration

- Added support for incremental auth updates using `AuthUpdate` and `AuthUpdateAction`.
- Integrated `SetAuthUpdateQueue` to propagate updates through a dedicated channel.
- Introduced new methods for handling auth add, modify, and delete actions.
- Updated service to ensure auth update queues are correctly initialized and consumed.
- Improved auth state synchronization across core and file-based clients with real-time updates.
- Refactored redundant auth handling logic for better efficiency and maintainability.
This commit is contained in:
Luis Pater
2025-09-23 04:16:22 +08:00
parent 3ffd87d8de
commit 792ec49e5b
4 changed files with 264 additions and 47 deletions

View File

@@ -13,6 +13,7 @@ import (
"io/fs" "io/fs"
"os" "os"
"path/filepath" "path/filepath"
"reflect"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -42,6 +43,24 @@ type Watcher struct {
watcher *fsnotify.Watcher watcher *fsnotify.Watcher
lastAuthHashes map[string]string lastAuthHashes map[string]string
lastConfigHash string lastConfigHash string
authQueue chan<- AuthUpdate
currentAuths map[string]*coreauth.Auth
}
// AuthUpdateAction represents the type of change detected in auth sources.
type AuthUpdateAction string
const (
AuthUpdateActionAdd AuthUpdateAction = "add"
AuthUpdateActionModify AuthUpdateAction = "modify"
AuthUpdateActionDelete AuthUpdateAction = "delete"
)
// AuthUpdate describes an incremental change to auth configuration.
type AuthUpdate struct {
Action AuthUpdateAction
ID string
Auth *coreauth.Auth
} }
const ( const (
@@ -104,6 +123,88 @@ func (w *Watcher) SetConfig(cfg *config.Config) {
w.config = cfg w.config = cfg
} }
// SetAuthUpdateQueue sets the queue used to emit auth updates.
func (w *Watcher) SetAuthUpdateQueue(queue chan<- AuthUpdate) {
w.clientsMutex.Lock()
defer w.clientsMutex.Unlock()
w.authQueue = queue
}
func (w *Watcher) refreshAuthState() {
auths := w.SnapshotCoreAuths()
w.clientsMutex.Lock()
updates := w.prepareAuthUpdatesLocked(auths)
w.clientsMutex.Unlock()
w.dispatchAuthUpdates(updates)
}
func (w *Watcher) prepareAuthUpdatesLocked(auths []*coreauth.Auth) []AuthUpdate {
newState := make(map[string]*coreauth.Auth, len(auths))
for _, auth := range auths {
if auth == nil || auth.ID == "" {
continue
}
newState[auth.ID] = auth.Clone()
}
if w.currentAuths == nil {
w.currentAuths = newState
if w.authQueue == nil {
return nil
}
updates := make([]AuthUpdate, 0, len(newState))
for id, auth := range newState {
updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: auth.Clone()})
}
return updates
}
if w.authQueue == nil {
w.currentAuths = newState
return nil
}
updates := make([]AuthUpdate, 0, len(newState)+len(w.currentAuths))
for id, auth := range newState {
if existing, ok := w.currentAuths[id]; !ok {
updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: auth.Clone()})
} else if !authEqual(existing, auth) {
updates = append(updates, AuthUpdate{Action: AuthUpdateActionModify, ID: id, Auth: auth.Clone()})
}
}
for id := range w.currentAuths {
if _, ok := newState[id]; !ok {
updates = append(updates, AuthUpdate{Action: AuthUpdateActionDelete, ID: id})
}
}
w.currentAuths = newState
return updates
}
func (w *Watcher) dispatchAuthUpdates(updates []AuthUpdate) {
if len(updates) == 0 || w.authQueue == nil {
return
}
for _, update := range updates {
w.authQueue <- update
}
}
func authEqual(a, b *coreauth.Auth) bool {
return reflect.DeepEqual(normalizeAuth(a), normalizeAuth(b))
}
func normalizeAuth(a *coreauth.Auth) *coreauth.Auth {
if a == nil {
return nil
}
clone := a.Clone()
clone.CreatedAt = time.Time{}
clone.UpdatedAt = time.Time{}
clone.LastRefreshedAt = time.Time{}
clone.NextRefreshAfter = time.Time{}
clone.Runtime = nil
clone.Quota.NextRecoverAt = time.Time{}
return clone
}
// SetClients sets the file-based clients. // SetClients sets the file-based clients.
// SetClients removed // SetClients removed
// SetAPIKeyClients removed // SetAPIKeyClients removed
@@ -326,6 +427,8 @@ func (w *Watcher) reloadClients() {
totalNewClients := authFileCount + glAPIKeyCount + claudeAPIKeyCount + codexAPIKeyCount + openAICompatCount totalNewClients := authFileCount + glAPIKeyCount + claudeAPIKeyCount + codexAPIKeyCount + openAICompatCount
w.refreshAuthState()
log.Infof("full client reload complete - old: %d clients, new: %d clients (%d auth files + %d GL API keys + %d Claude API keys + %d Codex keys + %d OpenAI-compat)", log.Infof("full client reload complete - old: %d clients, new: %d clients (%d auth files + %d GL API keys + %d Claude API keys + %d Codex keys + %d OpenAI-compat)",
0, 0,
totalNewClients, totalNewClients,
@@ -380,6 +483,8 @@ func (w *Watcher) addOrUpdateClient(path string) {
w.clientsMutex.Unlock() // Unlock before the callback w.clientsMutex.Unlock() // Unlock before the callback
w.refreshAuthState()
if w.reloadCallback != nil { if w.reloadCallback != nil {
log.Debugf("triggering server update callback after add/update") log.Debugf("triggering server update callback after add/update")
w.reloadCallback(cfg) w.reloadCallback(cfg)
@@ -395,6 +500,8 @@ func (w *Watcher) removeClient(path string) {
w.clientsMutex.Unlock() // Release the lock before the callback w.clientsMutex.Unlock() // Release the lock before the callback
w.refreshAuthState()
if w.reloadCallback != nil { if w.reloadCallback != nil {
log.Debugf("triggering server update callback after removal") log.Debugf("triggering server update callback after removal")
w.reloadCallback(cfg) w.reloadCallback(cfg)

View File

@@ -15,6 +15,7 @@ import (
"github.com/router-for-me/CLIProxyAPI/v6/internal/registry" "github.com/router-for-me/CLIProxyAPI/v6/internal/registry"
"github.com/router-for-me/CLIProxyAPI/v6/internal/runtime/executor" "github.com/router-for-me/CLIProxyAPI/v6/internal/runtime/executor"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util" "github.com/router-for-me/CLIProxyAPI/v6/internal/util"
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher"
sdkaccess "github.com/router-for-me/CLIProxyAPI/v6/sdk/access" sdkaccess "github.com/router-for-me/CLIProxyAPI/v6/sdk/access"
_ "github.com/router-for-me/CLIProxyAPI/v6/sdk/access/providers/configapikey" _ "github.com/router-for-me/CLIProxyAPI/v6/sdk/access/providers/configapikey"
sdkAuth "github.com/router-for-me/CLIProxyAPI/v6/sdk/auth" sdkAuth "github.com/router-for-me/CLIProxyAPI/v6/sdk/auth"
@@ -39,6 +40,8 @@ type Service struct {
watcher *WatcherWrapper watcher *WatcherWrapper
watcherCancel context.CancelFunc watcherCancel context.CancelFunc
authUpdates chan watcher.AuthUpdate
authQueueStop context.CancelFunc
// legacy client caches removed // legacy client caches removed
authManager *sdkAuth.Manager authManager *sdkAuth.Manager
@@ -70,6 +73,132 @@ func (s *Service) refreshAccessProviders(cfg *config.Config) {
s.accessManager.SetProviders(providers) s.accessManager.SetProviders(providers)
} }
func (s *Service) ensureAuthUpdateQueue(ctx context.Context) {
if s == nil {
return
}
if s.authUpdates == nil {
s.authUpdates = make(chan watcher.AuthUpdate, 64)
}
if s.authQueueStop != nil {
return
}
queueCtx, cancel := context.WithCancel(ctx)
s.authQueueStop = cancel
go s.consumeAuthUpdates(queueCtx)
}
func (s *Service) consumeAuthUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case update, ok := <-s.authUpdates:
if !ok {
return
}
s.handleAuthUpdate(ctx, update)
}
}
}
func (s *Service) handleAuthUpdate(ctx context.Context, update watcher.AuthUpdate) {
if s == nil {
return
}
s.cfgMu.RLock()
cfg := s.cfg
s.cfgMu.RUnlock()
if cfg == nil || s.coreManager == nil {
return
}
switch update.Action {
case watcher.AuthUpdateActionAdd, watcher.AuthUpdateActionModify:
if update.Auth == nil || update.Auth.ID == "" {
return
}
s.applyCoreAuthAddOrUpdate(ctx, update.Auth)
case watcher.AuthUpdateActionDelete:
id := update.ID
if id == "" && update.Auth != nil {
id = update.Auth.ID
}
if id == "" {
return
}
s.applyCoreAuthRemoval(ctx, id)
default:
log.Debugf("received unknown auth update action: %v", update.Action)
}
}
func (s *Service) applyCoreAuthAddOrUpdate(ctx context.Context, auth *coreauth.Auth) {
if s == nil || auth == nil || auth.ID == "" {
return
}
if s.coreManager == nil {
return
}
auth = auth.Clone()
s.ensureExecutorsForAuth(auth)
s.registerModelsForAuth(auth)
if existing, ok := s.coreManager.GetByID(auth.ID); ok && existing != nil {
auth.CreatedAt = existing.CreatedAt
auth.LastRefreshedAt = existing.LastRefreshedAt
auth.NextRefreshAfter = existing.NextRefreshAfter
if _, err := s.coreManager.Update(ctx, auth); err != nil {
log.Errorf("failed to update auth %s: %v", auth.ID, err)
}
return
}
if _, err := s.coreManager.Register(ctx, auth); err != nil {
log.Errorf("failed to register auth %s: %v", auth.ID, err)
}
}
func (s *Service) applyCoreAuthRemoval(ctx context.Context, id string) {
if s == nil || id == "" {
return
}
if s.coreManager == nil {
return
}
GlobalModelRegistry().UnregisterClient(id)
if existing, ok := s.coreManager.GetByID(id); ok && existing != nil {
existing.Disabled = true
existing.Status = coreauth.StatusDisabled
if _, err := s.coreManager.Update(ctx, existing); err != nil {
log.Errorf("failed to disable auth %s: %v", id, err)
}
}
}
func (s *Service) ensureExecutorsForAuth(a *coreauth.Auth) {
if s == nil || a == nil {
return
}
switch strings.ToLower(a.Provider) {
case "gemini":
s.coreManager.RegisterExecutor(executor.NewGeminiExecutor(s.cfg))
case "gemini-cli":
s.coreManager.RegisterExecutor(executor.NewGeminiCLIExecutor(s.cfg))
case "gemini-web":
s.coreManager.RegisterExecutor(executor.NewGeminiWebExecutor(s.cfg))
case "claude":
s.coreManager.RegisterExecutor(executor.NewClaudeExecutor(s.cfg))
case "codex":
s.coreManager.RegisterExecutor(executor.NewCodexExecutor(s.cfg))
case "qwen":
s.coreManager.RegisterExecutor(executor.NewQwenExecutor(s.cfg))
default:
providerKey := strings.ToLower(strings.TrimSpace(a.Provider))
if providerKey == "" {
providerKey = "openai-compatibility"
}
s.coreManager.RegisterExecutor(executor.NewOpenAICompatExecutor(providerKey, s.cfg))
}
}
// Run starts the service and blocks until the context is cancelled or the server stops. // Run starts the service and blocks until the context is cancelled or the server stops.
func (s *Service) Run(ctx context.Context) error { func (s *Service) Run(ctx context.Context) error {
if s == nil { if s == nil {
@@ -150,15 +279,13 @@ func (s *Service) Run(ctx context.Context) error {
newCfg = s.cfg newCfg = s.cfg
s.cfgMu.RUnlock() s.cfgMu.RUnlock()
} }
if newCfg == nil {
// Pull the latest auth snapshot and sync return
auths := watcherWrapper.SnapshotAuths() }
s.syncCoreAuthFromAuths(ctx, auths)
s.refreshAccessProviders(newCfg) s.refreshAccessProviders(newCfg)
if s.server != nil { if s.server != nil {
s.server.UpdateClients(newCfg) s.server.UpdateClients(newCfg)
} }
s.cfgMu.Lock() s.cfgMu.Lock()
s.cfg = newCfg s.cfg = newCfg
s.cfgMu.Unlock() s.cfgMu.Unlock()
@@ -170,6 +297,10 @@ func (s *Service) Run(ctx context.Context) error {
return fmt.Errorf("cliproxy: failed to create watcher: %w", err) return fmt.Errorf("cliproxy: failed to create watcher: %w", err)
} }
s.watcher = watcherWrapper s.watcher = watcherWrapper
s.ensureAuthUpdateQueue(ctx)
if s.authUpdates != nil {
watcherWrapper.SetAuthUpdateQueue(s.authUpdates)
}
watcherWrapper.SetConfig(s.cfg) watcherWrapper.SetConfig(s.cfg)
watcherCtx, watcherCancel := context.WithCancel(context.Background()) watcherCtx, watcherCancel := context.WithCancel(context.Background())
@@ -234,6 +365,10 @@ func (s *Service) Shutdown(ctx context.Context) error {
shutdownErr = err shutdownErr = err
} }
} }
if s.authQueueStop != nil {
s.authQueueStop()
s.authQueueStop = nil
}
// no legacy clients to persist // no legacy clients to persist
@@ -280,41 +415,7 @@ func (s *Service) syncCoreAuthFromAuths(ctx context.Context, auths []*coreauth.A
continue continue
} }
seen[a.ID] = struct{}{} seen[a.ID] = struct{}{}
// Ensure executors registered per provider: prefer stateless where available. s.applyCoreAuthAddOrUpdate(ctx, a)
switch strings.ToLower(a.Provider) {
case "gemini":
s.coreManager.RegisterExecutor(executor.NewGeminiExecutor(s.cfg))
case "gemini-cli":
s.coreManager.RegisterExecutor(executor.NewGeminiCLIExecutor(s.cfg))
case "gemini-web":
s.coreManager.RegisterExecutor(executor.NewGeminiWebExecutor(s.cfg))
case "claude":
s.coreManager.RegisterExecutor(executor.NewClaudeExecutor(s.cfg))
case "codex":
s.coreManager.RegisterExecutor(executor.NewCodexExecutor(s.cfg))
case "qwen":
s.coreManager.RegisterExecutor(executor.NewQwenExecutor(s.cfg))
default:
providerKey := strings.ToLower(strings.TrimSpace(a.Provider))
if providerKey == "" {
providerKey = "openai-compatibility"
}
s.coreManager.RegisterExecutor(executor.NewOpenAICompatExecutor(providerKey, s.cfg))
}
// Preserve existing temporal fields
if existing, ok := s.coreManager.GetByID(a.ID); ok && existing != nil {
a.CreatedAt = existing.CreatedAt
a.LastRefreshedAt = existing.LastRefreshedAt
a.NextRefreshAfter = existing.NextRefreshAfter
}
// Ensure model registry reflects core auth identity
s.registerModelsForAuth(a)
if _, ok := s.coreManager.GetByID(a.ID); ok {
_, _ = s.coreManager.Update(ctx, a)
} else {
_, _ = s.coreManager.Register(ctx, a)
}
} }
// Disable removed auths // Disable removed auths
for _, stored := range s.coreManager.List() { for _, stored := range s.coreManager.List() {
@@ -324,11 +425,7 @@ func (s *Service) syncCoreAuthFromAuths(ctx context.Context, auths []*coreauth.A
if _, ok := seen[stored.ID]; ok { if _, ok := seen[stored.ID]; ok {
continue continue
} }
stored.Disabled = true s.applyCoreAuthRemoval(ctx, stored.ID)
stored.Status = coreauth.StatusDisabled
// Unregister from model registry when disabled
GlobalModelRegistry().UnregisterClient(stored.ID)
_, _ = s.coreManager.Update(ctx, stored)
} }
} }

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher"
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
) )
@@ -39,8 +40,9 @@ type WatcherWrapper struct {
start func(ctx context.Context) error start func(ctx context.Context) error
stop func() error stop func() error
setConfig func(cfg *config.Config) setConfig func(cfg *config.Config)
snapshotAuths func() []*coreauth.Auth snapshotAuths func() []*coreauth.Auth
setUpdateQueue func(queue chan<- watcher.AuthUpdate)
} }
// Start proxies to the underlying watcher Start implementation. // Start proxies to the underlying watcher Start implementation.
@@ -80,3 +82,11 @@ func (w *WatcherWrapper) SnapshotAuths() []*coreauth.Auth {
} }
return w.snapshotAuths() return w.snapshotAuths()
} }
// SetAuthUpdateQueue registers the channel used to propagate auth updates.
func (w *WatcherWrapper) SetAuthUpdateQueue(queue chan<- watcher.AuthUpdate) {
if w == nil || w.setUpdateQueue == nil {
return
}
w.setUpdateQueue(queue)
}

View File

@@ -25,5 +25,8 @@ func defaultWatcherFactory(configPath, authDir string, reload func(*config.Confi
w.SetConfig(cfg) w.SetConfig(cfg)
}, },
snapshotAuths: func() []*coreauth.Auth { return w.SnapshotCoreAuths() }, snapshotAuths: func() []*coreauth.Auth { return w.SnapshotCoreAuths() },
setUpdateQueue: func(queue chan<- watcher.AuthUpdate) {
w.SetAuthUpdateQueue(queue)
},
}, nil }, nil
} }