Files
CLIProxyAPI/sdk/cliproxy/usage/manager.go
Luis Pater 3ade03f3b3 feat(usage): implement usage tracking infrastructure across executors
- Added `LoggerPlugin` to log usage metrics for observability.
- Introduced a new `Manager` to handle usage record queuing and plugin registration.
- Integrated new usage reporter and detailed metrics parsing into executors, covering providers like OpenAI, Codex, Claude, and Gemini.
- Improved token usage breakdown across streaming and non-streaming responses.
2025-09-24 03:49:09 +08:00

183 lines
3.9 KiB
Go

package usage
import (
"context"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
// Record contains the usage statistics captured for a single provider request.
type Record struct {
Provider string
Model string
APIKey string
AuthID string
RequestedAt time.Time
Detail Detail
}
// Detail holds the token usage breakdown.
type Detail struct {
InputTokens int64
OutputTokens int64
ReasoningTokens int64
CachedTokens int64
TotalTokens int64
}
// Plugin consumes usage records emitted by the proxy runtime.
type Plugin interface {
HandleUsage(ctx context.Context, record Record)
}
type queueItem struct {
ctx context.Context
record Record
}
// Manager maintains a queue of usage records and delivers them to registered plugins.
type Manager struct {
once sync.Once
stopOnce sync.Once
cancel context.CancelFunc
queue chan queueItem
pluginsMu sync.RWMutex
plugins []Plugin
}
// NewManager constructs a manager with a buffered queue.
func NewManager(buffer int) *Manager {
if buffer <= 0 {
buffer = 256
}
return &Manager{queue: make(chan queueItem, buffer)}
}
// Start launches the background dispatcher. Calling Start multiple times is safe.
func (m *Manager) Start(ctx context.Context) {
if m == nil {
return
}
m.once.Do(func() {
if ctx == nil {
ctx = context.Background()
}
var workerCtx context.Context
workerCtx, m.cancel = context.WithCancel(ctx)
go m.run(workerCtx)
})
}
// Stop stops the dispatcher and drains the queue.
func (m *Manager) Stop() {
if m == nil {
return
}
m.stopOnce.Do(func() {
if m.cancel != nil {
m.cancel()
}
close(m.queue)
})
}
// Register appends a plugin to the delivery list.
func (m *Manager) Register(plugin Plugin) {
if m == nil || plugin == nil {
return
}
m.pluginsMu.Lock()
m.plugins = append(m.plugins, plugin)
m.pluginsMu.Unlock()
}
// Publish enqueues a usage record for processing. If no plugin is registered
// the record will be discarded downstream.
func (m *Manager) Publish(ctx context.Context, record Record) {
if m == nil {
return
}
// ensure worker is running even if Start was not called explicitly
m.Start(context.Background())
select {
case m.queue <- queueItem{ctx: ctx, record: record}:
default:
// queue is full; drop the record to avoid blocking runtime paths
log.Debugf("usage: queue full, dropping record for provider %s", record.Provider)
}
}
func (m *Manager) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
m.drain()
return
case item, ok := <-m.queue:
if !ok {
return
}
m.dispatch(item)
}
}
}
func (m *Manager) drain() {
for {
select {
case item, ok := <-m.queue:
if !ok {
return
}
m.dispatch(item)
default:
return
}
}
}
func (m *Manager) dispatch(item queueItem) {
m.pluginsMu.RLock()
plugins := make([]Plugin, len(m.plugins))
copy(plugins, m.plugins)
m.pluginsMu.RUnlock()
if len(plugins) == 0 {
return
}
for _, plugin := range plugins {
if plugin == nil {
continue
}
safeInvoke(plugin, item.ctx, item.record)
}
}
func safeInvoke(plugin Plugin, ctx context.Context, record Record) {
defer func() {
if r := recover(); r != nil {
log.Errorf("usage: plugin panic recovered: %v", r)
}
}()
plugin.HandleUsage(ctx, record)
}
var defaultManager = NewManager(512)
// DefaultManager returns the global usage manager instance.
func DefaultManager() *Manager { return defaultManager }
// RegisterPlugin registers a plugin on the default manager.
func RegisterPlugin(plugin Plugin) { DefaultManager().Register(plugin) }
// PublishRecord publishes a record using the default manager.
func PublishRecord(ctx context.Context, record Record) { DefaultManager().Publish(ctx, record) }
// StartDefault starts the default manager's dispatcher.
func StartDefault(ctx context.Context) { DefaultManager().Start(ctx) }
// StopDefault stops the default manager's dispatcher.
func StopDefault() { DefaultManager().Stop() }