feat(auth): enhance watcher with asynchronous dispatch and buffering

- Added async dispatch loop to `Watcher` for handling incremental `AuthUpdate` with in-memory buffering.
- Improved resilience against high-frequency auth changes by coalescing updates and reducing redundant processing.
- Updated `cliproxy` service to increase auth update queue capacity and optimize backlog consumption.
- Added detailed SDK integration documentation in English and Chinese (`sdk-watcher.md`, `sdk-watcher_CN.md`).
This commit is contained in:
Luis Pater
2025-09-23 04:33:48 +08:00
parent 792ec49e5b
commit 6046a8c95b
4 changed files with 195 additions and 6 deletions

View File

@@ -45,6 +45,11 @@ type Watcher struct {
lastConfigHash string
authQueue chan<- AuthUpdate
currentAuths map[string]*coreauth.Auth
dispatchMu sync.Mutex
dispatchCond *sync.Cond
pendingUpdates map[string]AuthUpdate
pendingOrder []string
dispatchCancel context.CancelFunc
}
// AuthUpdateAction represents the type of change detected in auth sources.
@@ -78,13 +83,15 @@ func NewWatcher(configPath, authDir string, reloadCallback func(*config.Config))
return nil, errNewWatcher
}
return &Watcher{
w := &Watcher{
configPath: configPath,
authDir: authDir,
reloadCallback: reloadCallback,
watcher: watcher,
lastAuthHashes: make(map[string]string),
}, nil
}
w.dispatchCond = sync.NewCond(&w.dispatchMu)
return w, nil
}
// Start begins watching the configuration file and authentication directory
@@ -113,6 +120,7 @@ func (w *Watcher) Start(ctx context.Context) error {
// Stop stops the file watcher
func (w *Watcher) Stop() error {
w.stopDispatch()
return w.watcher.Close()
}
@@ -128,6 +136,23 @@ func (w *Watcher) SetAuthUpdateQueue(queue chan<- AuthUpdate) {
w.clientsMutex.Lock()
defer w.clientsMutex.Unlock()
w.authQueue = queue
if w.dispatchCond == nil {
w.dispatchCond = sync.NewCond(&w.dispatchMu)
}
if w.dispatchCancel != nil {
w.dispatchCancel()
if w.dispatchCond != nil {
w.dispatchMu.Lock()
w.dispatchCond.Broadcast()
w.dispatchMu.Unlock()
}
w.dispatchCancel = nil
}
if queue != nil {
ctx, cancel := context.WithCancel(context.Background())
w.dispatchCancel = cancel
go w.dispatchLoop(ctx)
}
}
func (w *Watcher) refreshAuthState() {
@@ -179,12 +204,104 @@ func (w *Watcher) prepareAuthUpdatesLocked(auths []*coreauth.Auth) []AuthUpdate
}
func (w *Watcher) dispatchAuthUpdates(updates []AuthUpdate) {
if len(updates) == 0 || w.authQueue == nil {
if len(updates) == 0 {
return
}
for _, update := range updates {
w.authQueue <- update
queue := w.getAuthQueue()
if queue == nil {
return
}
baseTS := time.Now().UnixNano()
w.dispatchMu.Lock()
if w.pendingUpdates == nil {
w.pendingUpdates = make(map[string]AuthUpdate)
}
for idx, update := range updates {
key := w.authUpdateKey(update, baseTS+int64(idx))
if _, exists := w.pendingUpdates[key]; !exists {
w.pendingOrder = append(w.pendingOrder, key)
}
w.pendingUpdates[key] = update
}
if w.dispatchCond != nil {
w.dispatchCond.Signal()
}
w.dispatchMu.Unlock()
}
func (w *Watcher) authUpdateKey(update AuthUpdate, ts int64) string {
if update.ID != "" {
return update.ID
}
return fmt.Sprintf("%s:%d", update.Action, ts)
}
func (w *Watcher) dispatchLoop(ctx context.Context) {
for {
batch, ok := w.nextPendingBatch(ctx)
if !ok {
return
}
queue := w.getAuthQueue()
if queue == nil {
if ctx.Err() != nil {
return
}
time.Sleep(10 * time.Millisecond)
continue
}
for _, update := range batch {
select {
case queue <- update:
case <-ctx.Done():
return
}
}
}
}
func (w *Watcher) nextPendingBatch(ctx context.Context) ([]AuthUpdate, bool) {
w.dispatchMu.Lock()
defer w.dispatchMu.Unlock()
for len(w.pendingOrder) == 0 {
if ctx.Err() != nil {
return nil, false
}
w.dispatchCond.Wait()
if ctx.Err() != nil {
return nil, false
}
}
batch := make([]AuthUpdate, 0, len(w.pendingOrder))
for _, key := range w.pendingOrder {
batch = append(batch, w.pendingUpdates[key])
delete(w.pendingUpdates, key)
}
w.pendingOrder = w.pendingOrder[:0]
return batch, true
}
func (w *Watcher) getAuthQueue() chan<- AuthUpdate {
w.clientsMutex.RLock()
defer w.clientsMutex.RUnlock()
return w.authQueue
}
func (w *Watcher) stopDispatch() {
if w.dispatchCancel != nil {
w.dispatchCancel()
w.dispatchCancel = nil
}
w.dispatchMu.Lock()
w.pendingOrder = nil
w.pendingUpdates = nil
if w.dispatchCond != nil {
w.dispatchCond.Broadcast()
}
w.dispatchMu.Unlock()
w.clientsMutex.Lock()
w.authQueue = nil
w.clientsMutex.Unlock()
}
func authEqual(a, b *coreauth.Auth) bool {