refactor(usage): replace channel-based queue with mutex-protected slice

- Switched to a slice-based queue with mutex and condition variable for better control over queuing and dispatching.
- Removed fixed buffer size to handle dynamic queuing.
- Enhanced shutdown logic to safely close the queue and wake up waiting goroutines.
This commit is contained in:
Luis Pater
2025-09-24 03:59:26 +08:00
parent 3ade03f3b3
commit 582677d067

View File

@@ -42,7 +42,11 @@ type Manager struct {
once sync.Once
stopOnce sync.Once
cancel context.CancelFunc
queue chan queueItem
mu sync.Mutex
cond *sync.Cond
queue []queueItem
closed bool
pluginsMu sync.RWMutex
plugins []Plugin
@@ -50,10 +54,9 @@ type Manager struct {
// NewManager constructs a manager with a buffered queue.
func NewManager(buffer int) *Manager {
if buffer <= 0 {
buffer = 256
}
return &Manager{queue: make(chan queueItem, buffer)}
m := &Manager{}
m.cond = sync.NewCond(&m.mu)
return m
}
// Start launches the background dispatcher. Calling Start multiple times is safe.
@@ -80,7 +83,10 @@ func (m *Manager) Stop() {
if m.cancel != nil {
m.cancel()
}
close(m.queue)
m.mu.Lock()
m.closed = true
m.mu.Unlock()
m.cond.Broadcast()
})
}
@@ -102,40 +108,30 @@ func (m *Manager) Publish(ctx context.Context, record Record) {
}
// ensure worker is running even if Start was not called explicitly
m.Start(context.Background())
select {
case m.queue <- queueItem{ctx: ctx, record: record}:
default:
// queue is full; drop the record to avoid blocking runtime paths
log.Debugf("usage: queue full, dropping record for provider %s", record.Provider)
m.mu.Lock()
if m.closed {
m.mu.Unlock()
return
}
m.queue = append(m.queue, queueItem{ctx: ctx, record: record})
m.mu.Unlock()
m.cond.Signal()
}
func (m *Manager) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
m.drain()
return
case item, ok := <-m.queue:
if !ok {
return
}
m.dispatch(item)
m.mu.Lock()
for !m.closed && len(m.queue) == 0 {
m.cond.Wait()
}
}
}
func (m *Manager) drain() {
for {
select {
case item, ok := <-m.queue:
if !ok {
return
}
m.dispatch(item)
default:
if len(m.queue) == 0 && m.closed {
m.mu.Unlock()
return
}
item := m.queue[0]
m.queue = m.queue[1:]
m.mu.Unlock()
m.dispatch(item)
}
}