diff --git a/internal/api/handlers/management/handler.go b/internal/api/handlers/management/handler.go index d3ccbda6..613c9841 100644 --- a/internal/api/handlers/management/handler.go +++ b/internal/api/handlers/management/handler.go @@ -24,8 +24,15 @@ import ( type attemptInfo struct { count int blockedUntil time.Time + lastActivity time.Time // track last activity for cleanup } +// attemptCleanupInterval controls how often stale IP entries are purged +const attemptCleanupInterval = 1 * time.Hour + +// attemptMaxIdleTime controls how long an IP can be idle before cleanup +const attemptMaxIdleTime = 2 * time.Hour + // Handler aggregates config reference, persistence path and helpers. type Handler struct { cfg *config.Config @@ -47,7 +54,7 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man envSecret, _ := os.LookupEnv("MANAGEMENT_PASSWORD") envSecret = strings.TrimSpace(envSecret) - return &Handler{ + h := &Handler{ cfg: cfg, configFilePath: configFilePath, failedAttempts: make(map[string]*attemptInfo), @@ -57,6 +64,38 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man allowRemoteOverride: envSecret != "", envSecret: envSecret, } + h.startAttemptCleanup() + return h +} + +// startAttemptCleanup launches a background goroutine that periodically +// removes stale IP entries from failedAttempts to prevent memory leaks. +func (h *Handler) startAttemptCleanup() { + go func() { + ticker := time.NewTicker(attemptCleanupInterval) + defer ticker.Stop() + for range ticker.C { + h.purgeStaleAttempts() + } + }() +} + +// purgeStaleAttempts removes IP entries that have been idle beyond attemptMaxIdleTime +// and whose ban (if any) has expired. +func (h *Handler) purgeStaleAttempts() { + now := time.Now() + h.attemptsMu.Lock() + defer h.attemptsMu.Unlock() + for ip, ai := range h.failedAttempts { + // Skip if still banned + if !ai.blockedUntil.IsZero() && now.Before(ai.blockedUntil) { + continue + } + // Remove if idle too long + if now.Sub(ai.lastActivity) > attemptMaxIdleTime { + delete(h.failedAttempts, ip) + } + } } // NewHandler creates a new management handler instance. @@ -149,6 +188,7 @@ func (h *Handler) Middleware() gin.HandlerFunc { h.failedAttempts[clientIP] = aip } aip.count++ + aip.lastActivity = time.Now() if aip.count >= maxFailures { aip.blockedUntil = time.Now().Add(banDuration) aip.count = 0 diff --git a/internal/cache/signature_cache.go b/internal/cache/signature_cache.go index c1326629..d4a864e0 100644 --- a/internal/cache/signature_cache.go +++ b/internal/cache/signature_cache.go @@ -26,11 +26,17 @@ const ( // MinValidSignatureLen is the minimum length for a signature to be considered valid MinValidSignatureLen = 50 + + // SessionCleanupInterval controls how often stale sessions are purged + SessionCleanupInterval = 10 * time.Minute ) // signatureCache stores signatures by sessionId -> textHash -> SignatureEntry var signatureCache sync.Map +// sessionCleanupOnce ensures the background cleanup goroutine starts only once +var sessionCleanupOnce sync.Once + // sessionCache is the inner map type type sessionCache struct { mu sync.RWMutex @@ -45,6 +51,9 @@ func hashText(text string) string { // getOrCreateSession gets or creates a session cache func getOrCreateSession(sessionID string) *sessionCache { + // Start background cleanup on first access + sessionCleanupOnce.Do(startSessionCleanup) + if val, ok := signatureCache.Load(sessionID); ok { return val.(*sessionCache) } @@ -53,6 +62,40 @@ func getOrCreateSession(sessionID string) *sessionCache { return actual.(*sessionCache) } +// startSessionCleanup launches a background goroutine that periodically +// removes sessions where all entries have expired. +func startSessionCleanup() { + go func() { + ticker := time.NewTicker(SessionCleanupInterval) + defer ticker.Stop() + for range ticker.C { + purgeExpiredSessions() + } + }() +} + +// purgeExpiredSessions removes sessions with no valid (non-expired) entries. +func purgeExpiredSessions() { + now := time.Now() + signatureCache.Range(func(key, value any) bool { + sc := value.(*sessionCache) + sc.mu.Lock() + // Remove expired entries + for k, entry := range sc.entries { + if now.Sub(entry.Timestamp) > SignatureCacheTTL { + delete(sc.entries, k) + } + } + isEmpty := len(sc.entries) == 0 + sc.mu.Unlock() + // Remove session if empty + if isEmpty { + signatureCache.Delete(key) + } + return true + }) +} + // CacheSignature stores a thinking signature for a given session and text. // Used for Claude models that require signed thinking blocks in multi-turn conversations. func CacheSignature(sessionID, text, signature string) { diff --git a/internal/runtime/executor/cache_helpers.go b/internal/runtime/executor/cache_helpers.go index 5272686b..b6de886d 100644 --- a/internal/runtime/executor/cache_helpers.go +++ b/internal/runtime/executor/cache_helpers.go @@ -1,10 +1,68 @@ package executor -import "time" +import ( + "sync" + "time" +) type codexCache struct { ID string Expire time.Time } -var codexCacheMap = map[string]codexCache{} +// codexCacheMap stores prompt cache IDs keyed by model+user_id. +// Protected by codexCacheMu. Entries expire after 1 hour. +var ( + codexCacheMap = make(map[string]codexCache) + codexCacheMu sync.RWMutex +) + +// codexCacheCleanupInterval controls how often expired entries are purged. +const codexCacheCleanupInterval = 15 * time.Minute + +// codexCacheCleanupOnce ensures the background cleanup goroutine starts only once. +var codexCacheCleanupOnce sync.Once + +// startCodexCacheCleanup launches a background goroutine that periodically +// removes expired entries from codexCacheMap to prevent memory leaks. +func startCodexCacheCleanup() { + go func() { + ticker := time.NewTicker(codexCacheCleanupInterval) + defer ticker.Stop() + for range ticker.C { + purgeExpiredCodexCache() + } + }() +} + +// purgeExpiredCodexCache removes entries that have expired. +func purgeExpiredCodexCache() { + now := time.Now() + codexCacheMu.Lock() + defer codexCacheMu.Unlock() + for key, cache := range codexCacheMap { + if cache.Expire.Before(now) { + delete(codexCacheMap, key) + } + } +} + +// getCodexCache retrieves a cached entry, returning ok=false if not found or expired. +func getCodexCache(key string) (codexCache, bool) { + codexCacheCleanupOnce.Do(startCodexCacheCleanup) + codexCacheMu.RLock() + cache, ok := codexCacheMap[key] + codexCacheMu.RUnlock() + if !ok || cache.Expire.Before(time.Now()) { + return codexCache{}, false + } + return cache, true +} + +// setCodexCache stores a cache entry. +func setCodexCache(key string, cache codexCache) { + codexCacheCleanupOnce.Do(startCodexCacheCleanup) + codexCacheMu.Lock() + codexCacheMap[key] = cache + codexCacheMu.Unlock() +} diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 0788e4f1..8e7c8df9 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -457,14 +457,14 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Form if from == "claude" { userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") if userIDResult.Exists() { - var hasKey bool key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String()) - if cache, hasKey = codexCacheMap[key]; !hasKey || cache.Expire.Before(time.Now()) { + var ok bool + if cache, ok = getCodexCache(key); !ok { cache = codexCache{ ID: uuid.New().String(), Expire: time.Now().Add(1 * time.Hour), } - codexCacheMap[key] = cache + setCodexCache(key, cache) } } } else if from == "openai-response" { diff --git a/internal/usage/logger_plugin.go b/internal/usage/logger_plugin.go index e4371e8d..38177d7d 100644 --- a/internal/usage/logger_plugin.go +++ b/internal/usage/logger_plugin.go @@ -87,6 +87,10 @@ type modelStats struct { Details []RequestDetail } +// maxDetailsPerModel limits the number of request details retained per model +// to prevent unbounded memory growth. Oldest entries are dropped when exceeded. +const maxDetailsPerModel = 1000 + // RequestDetail stores the timestamp and token usage for a single request. type RequestDetail struct { Timestamp time.Time `json:"timestamp"` @@ -221,6 +225,11 @@ func (s *RequestStatistics) updateAPIStats(stats *apiStats, model string, detail modelStatsValue.TotalRequests++ modelStatsValue.TotalTokens += detail.Tokens.TotalTokens modelStatsValue.Details = append(modelStatsValue.Details, detail) + // Prevent unbounded growth by dropping oldest entries when limit exceeded + if len(modelStatsValue.Details) > maxDetailsPerModel { + excess := len(modelStatsValue.Details) - maxDetailsPerModel + modelStatsValue.Details = modelStatsValue.Details[excess:] + } } // Snapshot returns a copy of the aggregated metrics for external consumption.