refactor(watcher): split watcher.go into focused modules

- Create dispatcher.go for auth update queue management
- Create events.go for fsnotify event handling
- Create config_reload.go for hot-reload logic
- Create clients.go for client lifecycle management
- Simplify watcher.go to core coordinator (~150 lines)
- Maintain 100% API backward compatibility
- All tests passing with 72%+ coverage
This commit is contained in:
hkfires
2025-12-17 15:53:28 +08:00
parent d605985f45
commit 41c2385aca
5 changed files with 877 additions and 855 deletions

View File

@@ -0,0 +1,273 @@
// dispatcher.go implements auth update dispatching and queue management.
// It batches, deduplicates, and delivers auth updates to registered consumers.
package watcher
import (
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/synthesizer"
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
)
func (w *Watcher) setAuthUpdateQueue(queue chan<- AuthUpdate) {
w.clientsMutex.Lock()
defer w.clientsMutex.Unlock()
w.authQueue = queue
if w.dispatchCond == nil {
w.dispatchCond = sync.NewCond(&w.dispatchMu)
}
if w.dispatchCancel != nil {
w.dispatchCancel()
if w.dispatchCond != nil {
w.dispatchMu.Lock()
w.dispatchCond.Broadcast()
w.dispatchMu.Unlock()
}
w.dispatchCancel = nil
}
if queue != nil {
ctx, cancel := context.WithCancel(context.Background())
w.dispatchCancel = cancel
go w.dispatchLoop(ctx)
}
}
func (w *Watcher) dispatchRuntimeAuthUpdate(update AuthUpdate) bool {
if w == nil {
return false
}
w.clientsMutex.Lock()
if w.runtimeAuths == nil {
w.runtimeAuths = make(map[string]*coreauth.Auth)
}
switch update.Action {
case AuthUpdateActionAdd, AuthUpdateActionModify:
if update.Auth != nil && update.Auth.ID != "" {
clone := update.Auth.Clone()
w.runtimeAuths[clone.ID] = clone
if w.currentAuths == nil {
w.currentAuths = make(map[string]*coreauth.Auth)
}
w.currentAuths[clone.ID] = clone.Clone()
}
case AuthUpdateActionDelete:
id := update.ID
if id == "" && update.Auth != nil {
id = update.Auth.ID
}
if id != "" {
delete(w.runtimeAuths, id)
if w.currentAuths != nil {
delete(w.currentAuths, id)
}
}
}
w.clientsMutex.Unlock()
if w.getAuthQueue() == nil {
return false
}
w.dispatchAuthUpdates([]AuthUpdate{update})
return true
}
func (w *Watcher) refreshAuthState(force bool) {
auths := w.SnapshotCoreAuths()
w.clientsMutex.Lock()
if len(w.runtimeAuths) > 0 {
for _, a := range w.runtimeAuths {
if a != nil {
auths = append(auths, a.Clone())
}
}
}
updates := w.prepareAuthUpdatesLocked(auths, force)
w.clientsMutex.Unlock()
w.dispatchAuthUpdates(updates)
}
func (w *Watcher) prepareAuthUpdatesLocked(auths []*coreauth.Auth, force bool) []AuthUpdate {
newState := make(map[string]*coreauth.Auth, len(auths))
for _, auth := range auths {
if auth == nil || auth.ID == "" {
continue
}
newState[auth.ID] = auth.Clone()
}
if w.currentAuths == nil {
w.currentAuths = newState
if w.authQueue == nil {
return nil
}
updates := make([]AuthUpdate, 0, len(newState))
for id, auth := range newState {
updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: auth.Clone()})
}
return updates
}
if w.authQueue == nil {
w.currentAuths = newState
return nil
}
updates := make([]AuthUpdate, 0, len(newState)+len(w.currentAuths))
for id, auth := range newState {
if existing, ok := w.currentAuths[id]; !ok {
updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: auth.Clone()})
} else if force || !authEqual(existing, auth) {
updates = append(updates, AuthUpdate{Action: AuthUpdateActionModify, ID: id, Auth: auth.Clone()})
}
}
for id := range w.currentAuths {
if _, ok := newState[id]; !ok {
updates = append(updates, AuthUpdate{Action: AuthUpdateActionDelete, ID: id})
}
}
w.currentAuths = newState
return updates
}
func (w *Watcher) dispatchAuthUpdates(updates []AuthUpdate) {
if len(updates) == 0 {
return
}
queue := w.getAuthQueue()
if queue == nil {
return
}
baseTS := time.Now().UnixNano()
w.dispatchMu.Lock()
if w.pendingUpdates == nil {
w.pendingUpdates = make(map[string]AuthUpdate)
}
for idx, update := range updates {
key := w.authUpdateKey(update, baseTS+int64(idx))
if _, exists := w.pendingUpdates[key]; !exists {
w.pendingOrder = append(w.pendingOrder, key)
}
w.pendingUpdates[key] = update
}
if w.dispatchCond != nil {
w.dispatchCond.Signal()
}
w.dispatchMu.Unlock()
}
func (w *Watcher) authUpdateKey(update AuthUpdate, ts int64) string {
if update.ID != "" {
return update.ID
}
return fmt.Sprintf("%s:%d", update.Action, ts)
}
func (w *Watcher) dispatchLoop(ctx context.Context) {
for {
batch, ok := w.nextPendingBatch(ctx)
if !ok {
return
}
queue := w.getAuthQueue()
if queue == nil {
if ctx.Err() != nil {
return
}
time.Sleep(10 * time.Millisecond)
continue
}
for _, update := range batch {
select {
case queue <- update:
case <-ctx.Done():
return
}
}
}
}
func (w *Watcher) nextPendingBatch(ctx context.Context) ([]AuthUpdate, bool) {
w.dispatchMu.Lock()
defer w.dispatchMu.Unlock()
for len(w.pendingOrder) == 0 {
if ctx.Err() != nil {
return nil, false
}
w.dispatchCond.Wait()
if ctx.Err() != nil {
return nil, false
}
}
batch := make([]AuthUpdate, 0, len(w.pendingOrder))
for _, key := range w.pendingOrder {
batch = append(batch, w.pendingUpdates[key])
delete(w.pendingUpdates, key)
}
w.pendingOrder = w.pendingOrder[:0]
return batch, true
}
func (w *Watcher) getAuthQueue() chan<- AuthUpdate {
w.clientsMutex.RLock()
defer w.clientsMutex.RUnlock()
return w.authQueue
}
func (w *Watcher) stopDispatch() {
if w.dispatchCancel != nil {
w.dispatchCancel()
w.dispatchCancel = nil
}
w.dispatchMu.Lock()
w.pendingOrder = nil
w.pendingUpdates = nil
if w.dispatchCond != nil {
w.dispatchCond.Broadcast()
}
w.dispatchMu.Unlock()
w.clientsMutex.Lock()
w.authQueue = nil
w.clientsMutex.Unlock()
}
func authEqual(a, b *coreauth.Auth) bool {
return reflect.DeepEqual(normalizeAuth(a), normalizeAuth(b))
}
func normalizeAuth(a *coreauth.Auth) *coreauth.Auth {
if a == nil {
return nil
}
clone := a.Clone()
clone.CreatedAt = time.Time{}
clone.UpdatedAt = time.Time{}
clone.LastRefreshedAt = time.Time{}
clone.NextRefreshAfter = time.Time{}
clone.Runtime = nil
clone.Quota.NextRecoverAt = time.Time{}
return clone
}
func snapshotCoreAuths(cfg *config.Config, authDir string) []*coreauth.Auth {
ctx := &synthesizer.SynthesisContext{
Config: cfg,
AuthDir: authDir,
Now: time.Now(),
IDGenerator: synthesizer.NewStableIDGenerator(),
}
var out []*coreauth.Auth
configSynth := synthesizer.NewConfigSynthesizer()
if auths, err := configSynth.Synthesize(ctx); err == nil {
out = append(out, auths...)
}
fileSynth := synthesizer.NewFileSynthesizer()
if auths, err := fileSynth.Synthesize(ctx); err == nil {
out = append(out, auths...)
}
return out
}