From 19609db13cec1bb1822e48ae3a89b53bba8fbe34 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Thu, 25 Sep 2025 01:06:02 +0800 Subject: [PATCH] feat(usage): add in-memory usage statistics tracking and API endpoint - Introduced in-memory request statistics aggregation in `LoggerPlugin`. - Added new structures for detailed metrics collection (e.g., token breakdown, request success/failure). - Implemented `/usage` management API endpoint for retrieving aggregated statistics. - Updated management handlers to support the new usage statistics functionality. - Enhanced documentation to describe the usage metrics API. --- MANAGEMENT_API.md | 55 ++++ MANAGEMENT_API_CN.md | 55 ++++ internal/api/handlers/management/handler.go | 13 +- internal/api/handlers/management/usage.go | 17 ++ internal/api/server.go | 1 + internal/usage/logger_plugin.go | 308 +++++++++++++++++++- sdk/cliproxy/service.go | 25 -- 7 files changed, 434 insertions(+), 40 deletions(-) create mode 100644 internal/api/handlers/management/usage.go diff --git a/MANAGEMENT_API.md b/MANAGEMENT_API.md index a456c00f..7cd5a00a 100644 --- a/MANAGEMENT_API.md +++ b/MANAGEMENT_API.md @@ -32,6 +32,61 @@ If a plaintext key is detected in the config at startup, it will be bcrypt‑has ## Endpoints +### Usage Statistics +- GET `/usage` — Retrieve aggregated in-memory request metrics + - Response: + ```json + { + "usage": { + "total_requests": 24, + "success_count": 22, + "failure_count": 2, + "total_tokens": 13890, + "requests_by_day": { + "2024-05-20": 12 + }, + "requests_by_hour": { + "09": 4, + "18": 8 + }, + "tokens_by_day": { + "2024-05-20": 9876 + }, + "tokens_by_hour": { + "09": 1234, + "18": 865 + }, + "apis": { + "POST /v1/chat/completions": { + "total_requests": 12, + "total_tokens": 9021, + "models": { + "gpt-4o-mini": { + "total_requests": 8, + "total_tokens": 7123, + "details": [ + { + "timestamp": "2024-05-20T09:15:04.123456Z", + "tokens": { + "input_tokens": 523, + "output_tokens": 308, + "reasoning_tokens": 0, + "cached_tokens": 0, + "total_tokens": 831 + } + } + ] + } + } + } + } + } + } + ``` + - Notes: + - Statistics are recalculated for every request that reports token usage; data resets when the server restarts. + - Hourly counters fold all days into the same hour bucket (`00`–`23`). + ### Config - GET `/config` — Get the full config - Request: diff --git a/MANAGEMENT_API_CN.md b/MANAGEMENT_API_CN.md index 687de194..803d9d41 100644 --- a/MANAGEMENT_API_CN.md +++ b/MANAGEMENT_API_CN.md @@ -32,6 +32,61 @@ ## 端点说明 +### Usage(请求统计) +- GET `/usage` — 获取内存中的请求统计 + - 响应: + ```json + { + "usage": { + "total_requests": 24, + "success_count": 22, + "failure_count": 2, + "total_tokens": 13890, + "requests_by_day": { + "2024-05-20": 12 + }, + "requests_by_hour": { + "09": 4, + "18": 8 + }, + "tokens_by_day": { + "2024-05-20": 9876 + }, + "tokens_by_hour": { + "09": 1234, + "18": 865 + }, + "apis": { + "POST /v1/chat/completions": { + "total_requests": 12, + "total_tokens": 9021, + "models": { + "gpt-4o-mini": { + "total_requests": 8, + "total_tokens": 7123, + "details": [ + { + "timestamp": "2024-05-20T09:15:04.123456Z", + "tokens": { + "input_tokens": 523, + "output_tokens": 308, + "reasoning_tokens": 0, + "cached_tokens": 0, + "total_tokens": 831 + } + } + ] + } + } + } + } + } + } + ``` + - 说明: + - 仅统计带有 token 使用信息的请求,服务重启后数据会被清空。 + - 小时维度会将所有日期折叠到 `00`–`23` 的统一小时桶中。 + ### Config - GET `/config` — 获取完整的配置 - 请求: diff --git a/internal/api/handlers/management/handler.go b/internal/api/handlers/management/handler.go index bb999fee..a90d44e2 100644 --- a/internal/api/handlers/management/handler.go +++ b/internal/api/handlers/management/handler.go @@ -11,6 +11,7 @@ import ( "github.com/gin-gonic/gin" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" + "github.com/router-for-me/CLIProxyAPI/v6/internal/usage" coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" "golang.org/x/crypto/bcrypt" ) @@ -29,11 +30,18 @@ type Handler struct { attemptsMu sync.Mutex failedAttempts map[string]*attemptInfo // keyed by client IP authManager *coreauth.Manager + usageStats *usage.RequestStatistics } // NewHandler creates a new management handler instance. func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Manager) *Handler { - return &Handler{cfg: cfg, configFilePath: configFilePath, failedAttempts: make(map[string]*attemptInfo), authManager: manager} + return &Handler{ + cfg: cfg, + configFilePath: configFilePath, + failedAttempts: make(map[string]*attemptInfo), + authManager: manager, + usageStats: usage.GetRequestStatistics(), + } } // SetConfig updates the in-memory config reference when the server hot-reloads. @@ -42,6 +50,9 @@ func (h *Handler) SetConfig(cfg *config.Config) { h.cfg = cfg } // SetAuthManager updates the auth manager reference used by management endpoints. func (h *Handler) SetAuthManager(manager *coreauth.Manager) { h.authManager = manager } +// SetUsageStatistics allows replacing the usage statistics reference. +func (h *Handler) SetUsageStatistics(stats *usage.RequestStatistics) { h.usageStats = stats } + // Middleware enforces access control for management endpoints. // All requests (local and remote) require a valid management key. // Additionally, remote access requires allow-remote-management=true. diff --git a/internal/api/handlers/management/usage.go b/internal/api/handlers/management/usage.go new file mode 100644 index 00000000..37a2d97b --- /dev/null +++ b/internal/api/handlers/management/usage.go @@ -0,0 +1,17 @@ +package management + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/router-for-me/CLIProxyAPI/v6/internal/usage" +) + +// GetUsageStatistics returns the in-memory request statistics snapshot. +func (h *Handler) GetUsageStatistics(c *gin.Context) { + var snapshot usage.StatisticsSnapshot + if h != nil && h.usageStats != nil { + snapshot = h.usageStats.Snapshot() + } + c.JSON(http.StatusOK, gin.H{"usage": snapshot}) +} diff --git a/internal/api/server.go b/internal/api/server.go index bea114a9..f96c8be2 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -271,6 +271,7 @@ func (s *Server) setupRoutes() { mgmt := s.engine.Group("/v0/management") mgmt.Use(s.mgmt.Middleware()) { + mgmt.GET("/usage", s.mgmt.GetUsageStatistics) mgmt.GET("/config", s.mgmt.GetConfig) mgmt.GET("/debug", s.mgmt.GetDebug) diff --git a/internal/usage/logger_plugin.go b/internal/usage/logger_plugin.go index 69a2dfa5..2ed49575 100644 --- a/internal/usage/logger_plugin.go +++ b/internal/usage/logger_plugin.go @@ -5,36 +5,316 @@ package usage import ( "context" - "encoding/json" + "fmt" + "sync" + "time" + "github.com/gin-gonic/gin" coreusage "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage" - log "github.com/sirupsen/logrus" ) func init() { coreusage.RegisterPlugin(NewLoggerPlugin()) } -// LoggerPlugin outputs every usage record to the application log. -// It implements the coreusage.Plugin interface to provide usage tracking -// and logging capabilities for monitoring API consumption. -type LoggerPlugin struct{} +// LoggerPlugin collects in-memory request statistics for usage analysis. +// It implements coreusage.Plugin to receive usage records emitted by the runtime. +type LoggerPlugin struct { + stats *RequestStatistics +} // NewLoggerPlugin constructs a new logger plugin instance. // // Returns: -// - *LoggerPlugin: A new logger plugin instance -func NewLoggerPlugin() *LoggerPlugin { return &LoggerPlugin{} } +// - *LoggerPlugin: A new logger plugin instance wired to the shared statistics store. +func NewLoggerPlugin() *LoggerPlugin { return &LoggerPlugin{stats: defaultRequestStatistics} } // HandleUsage implements coreusage.Plugin. -// It processes usage records by marshaling them to JSON and logging them -// at debug level for observability purposes. +// It updates the in-memory statistics store whenever a usage record is received. // // Parameters: // - ctx: The context for the usage record -// - record: The usage record to process and log +// - record: The usage record to aggregate func (p *LoggerPlugin) HandleUsage(ctx context.Context, record coreusage.Record) { - // Output all relevant fields for observability; keep logging lightweight and non-blocking. - data, _ := json.Marshal(record) - log.Debug(string(data)) + if p == nil || p.stats == nil { + return + } + p.stats.Record(ctx, record) +} + +// RequestStatistics maintains aggregated request metrics in memory. +type RequestStatistics struct { + mu sync.RWMutex + + totalRequests int64 + successCount int64 + failureCount int64 + totalTokens int64 + + apis map[string]*apiStats + + requestsByDay map[string]int64 + requestsByHour map[int]int64 + tokensByDay map[string]int64 + tokensByHour map[int]int64 +} + +// apiStats holds aggregated metrics for a single API key. +type apiStats struct { + TotalRequests int64 + TotalTokens int64 + Models map[string]*modelStats +} + +// modelStats holds aggregated metrics for a specific model within an API. +type modelStats struct { + TotalRequests int64 + TotalTokens int64 + Details []RequestDetail +} + +// RequestDetail stores the timestamp and token usage for a single request. +type RequestDetail struct { + Timestamp time.Time `json:"timestamp"` + Tokens TokenStats `json:"tokens"` +} + +// TokenStats captures the token usage breakdown for a request. +type TokenStats struct { + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` + ReasoningTokens int64 `json:"reasoning_tokens"` + CachedTokens int64 `json:"cached_tokens"` + TotalTokens int64 `json:"total_tokens"` +} + +// StatisticsSnapshot represents an immutable view of the aggregated metrics. +type StatisticsSnapshot struct { + TotalRequests int64 `json:"total_requests"` + SuccessCount int64 `json:"success_count"` + FailureCount int64 `json:"failure_count"` + TotalTokens int64 `json:"total_tokens"` + + APIs map[string]APISnapshot `json:"apis"` + + RequestsByDay map[string]int64 `json:"requests_by_day"` + RequestsByHour map[string]int64 `json:"requests_by_hour"` + TokensByDay map[string]int64 `json:"tokens_by_day"` + TokensByHour map[string]int64 `json:"tokens_by_hour"` +} + +// APISnapshot summarises metrics for a single API key. +type APISnapshot struct { + TotalRequests int64 `json:"total_requests"` + TotalTokens int64 `json:"total_tokens"` + Models map[string]ModelSnapshot `json:"models"` +} + +// ModelSnapshot summarises metrics for a specific model. +type ModelSnapshot struct { + TotalRequests int64 `json:"total_requests"` + TotalTokens int64 `json:"total_tokens"` + Details []RequestDetail `json:"details"` +} + +var defaultRequestStatistics = NewRequestStatistics() + +// GetRequestStatistics returns the shared statistics store. +func GetRequestStatistics() *RequestStatistics { return defaultRequestStatistics } + +// NewRequestStatistics constructs an empty statistics store. +func NewRequestStatistics() *RequestStatistics { + return &RequestStatistics{ + apis: make(map[string]*apiStats), + requestsByDay: make(map[string]int64), + requestsByHour: make(map[int]int64), + tokensByDay: make(map[string]int64), + tokensByHour: make(map[int]int64), + } +} + +// Record ingests a new usage record and updates the aggregates. +func (s *RequestStatistics) Record(ctx context.Context, record coreusage.Record) { + if s == nil { + return + } + timestamp := record.RequestedAt + if timestamp.IsZero() { + timestamp = time.Now() + } + detail := normaliseDetail(record.Detail) + totalTokens := detail.TotalTokens + statsKey := record.APIKey + if statsKey == "" { + statsKey = resolveAPIIdentifier(ctx, record) + } + success := resolveSuccess(ctx) + modelName := record.Model + if modelName == "" { + modelName = "unknown" + } + dayKey := timestamp.Format("2006-01-02") + hourKey := timestamp.Hour() + + s.mu.Lock() + defer s.mu.Unlock() + + s.totalRequests++ + if success { + s.successCount++ + } else { + s.failureCount++ + } + s.totalTokens += totalTokens + + stats, ok := s.apis[statsKey] + if !ok { + stats = &apiStats{Models: make(map[string]*modelStats)} + s.apis[statsKey] = stats + } + s.updateAPIStats(stats, modelName, RequestDetail{Timestamp: timestamp, Tokens: detail}) + + s.requestsByDay[dayKey]++ + s.requestsByHour[hourKey]++ + s.tokensByDay[dayKey] += totalTokens + s.tokensByHour[hourKey] += totalTokens +} + +func (s *RequestStatistics) updateAPIStats(stats *apiStats, model string, detail RequestDetail) { + stats.TotalRequests++ + stats.TotalTokens += detail.Tokens.TotalTokens + modelStatsValue, ok := stats.Models[model] + if !ok { + modelStatsValue = &modelStats{} + stats.Models[model] = modelStatsValue + } + modelStatsValue.TotalRequests++ + modelStatsValue.TotalTokens += detail.Tokens.TotalTokens + modelStatsValue.Details = append(modelStatsValue.Details, detail) +} + +// Snapshot returns a copy of the aggregated metrics for external consumption. +func (s *RequestStatistics) Snapshot() StatisticsSnapshot { + result := StatisticsSnapshot{} + if s == nil { + return result + } + + s.mu.RLock() + defer s.mu.RUnlock() + + result.TotalRequests = s.totalRequests + result.SuccessCount = s.successCount + result.FailureCount = s.failureCount + result.TotalTokens = s.totalTokens + + result.APIs = make(map[string]APISnapshot, len(s.apis)) + for apiName, stats := range s.apis { + apiSnapshot := APISnapshot{ + TotalRequests: stats.TotalRequests, + TotalTokens: stats.TotalTokens, + Models: make(map[string]ModelSnapshot, len(stats.Models)), + } + for modelName, modelStatsValue := range stats.Models { + requestDetails := make([]RequestDetail, len(modelStatsValue.Details)) + copy(requestDetails, modelStatsValue.Details) + apiSnapshot.Models[modelName] = ModelSnapshot{ + TotalRequests: modelStatsValue.TotalRequests, + TotalTokens: modelStatsValue.TotalTokens, + Details: requestDetails, + } + } + result.APIs[apiName] = apiSnapshot + } + + result.RequestsByDay = make(map[string]int64, len(s.requestsByDay)) + for k, v := range s.requestsByDay { + result.RequestsByDay[k] = v + } + + result.RequestsByHour = make(map[string]int64, len(s.requestsByHour)) + for hour, v := range s.requestsByHour { + key := formatHour(hour) + result.RequestsByHour[key] = v + } + + result.TokensByDay = make(map[string]int64, len(s.tokensByDay)) + for k, v := range s.tokensByDay { + result.TokensByDay[k] = v + } + + result.TokensByHour = make(map[string]int64, len(s.tokensByHour)) + for hour, v := range s.tokensByHour { + key := formatHour(hour) + result.TokensByHour[key] = v + } + + return result +} + +func resolveAPIIdentifier(ctx context.Context, record coreusage.Record) string { + if ctx != nil { + if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { + path := ginCtx.FullPath() + if path == "" && ginCtx.Request != nil { + path = ginCtx.Request.URL.Path + } + method := "" + if ginCtx.Request != nil { + method = ginCtx.Request.Method + } + if path != "" { + if method != "" { + return method + " " + path + } + return path + } + } + } + if record.Provider != "" { + return record.Provider + } + return "unknown" +} + +func resolveSuccess(ctx context.Context) bool { + if ctx == nil { + return true + } + ginCtx, ok := ctx.Value("gin").(*gin.Context) + if !ok || ginCtx == nil { + return true + } + status := ginCtx.Writer.Status() + if status == 0 { + return true + } + return status < httpStatusBadRequest +} + +const httpStatusBadRequest = 400 + +func normaliseDetail(detail coreusage.Detail) TokenStats { + tokens := TokenStats{ + InputTokens: detail.InputTokens, + OutputTokens: detail.OutputTokens, + ReasoningTokens: detail.ReasoningTokens, + CachedTokens: detail.CachedTokens, + TotalTokens: detail.TotalTokens, + } + if tokens.TotalTokens == 0 { + tokens.TotalTokens = detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens + } + if tokens.TotalTokens == 0 { + tokens.TotalTokens = detail.InputTokens + detail.OutputTokens + detail.ReasoningTokens + detail.CachedTokens + } + return tokens +} + +func formatHour(hour int) string { + if hour < 0 { + hour = 0 + } + hour = hour % 24 + return fmt.Sprintf("%02d", hour) } diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index 40cc32b8..5194608d 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -476,31 +476,6 @@ func (s *Service) ensureAuthDir() error { return nil } -// syncCoreAuthFromAuths registers or updates core auths and disables missing ones. -func (s *Service) syncCoreAuthFromAuths(ctx context.Context, auths []*coreauth.Auth) { - if s.coreManager == nil { - return - } - seen := make(map[string]struct{}, len(auths)) - for _, a := range auths { - if a == nil || a.ID == "" { - continue - } - seen[a.ID] = struct{}{} - s.applyCoreAuthAddOrUpdate(ctx, a) - } - // Disable removed auths - for _, stored := range s.coreManager.List() { - if stored == nil { - continue - } - if _, ok := seen[stored.ID]; ok { - continue - } - s.applyCoreAuthRemoval(ctx, stored.ID) - } -} - // registerModelsForAuth (re)binds provider models in the global registry using the core auth ID as client identifier. func (s *Service) registerModelsForAuth(a *coreauth.Auth) { if a == nil || a.ID == "" {