Merge pull request #947 from pykancha/fix-memory-leak

Resolve memory leaks causing OOM in k8s deployment
This commit is contained in:
Luis Pater
2026-01-10 00:40:47 +08:00
committed by GitHub
5 changed files with 156 additions and 6 deletions

View File

@@ -24,8 +24,15 @@ import (
type attemptInfo struct { type attemptInfo struct {
count int count int
blockedUntil time.Time blockedUntil time.Time
lastActivity time.Time // track last activity for cleanup
} }
// attemptCleanupInterval controls how often stale IP entries are purged
const attemptCleanupInterval = 1 * time.Hour
// attemptMaxIdleTime controls how long an IP can be idle before cleanup
const attemptMaxIdleTime = 2 * time.Hour
// Handler aggregates config reference, persistence path and helpers. // Handler aggregates config reference, persistence path and helpers.
type Handler struct { type Handler struct {
cfg *config.Config cfg *config.Config
@@ -47,7 +54,7 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man
envSecret, _ := os.LookupEnv("MANAGEMENT_PASSWORD") envSecret, _ := os.LookupEnv("MANAGEMENT_PASSWORD")
envSecret = strings.TrimSpace(envSecret) envSecret = strings.TrimSpace(envSecret)
return &Handler{ h := &Handler{
cfg: cfg, cfg: cfg,
configFilePath: configFilePath, configFilePath: configFilePath,
failedAttempts: make(map[string]*attemptInfo), failedAttempts: make(map[string]*attemptInfo),
@@ -57,6 +64,38 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man
allowRemoteOverride: envSecret != "", allowRemoteOverride: envSecret != "",
envSecret: envSecret, envSecret: envSecret,
} }
h.startAttemptCleanup()
return h
}
// startAttemptCleanup launches a background goroutine that periodically
// removes stale IP entries from failedAttempts to prevent memory leaks.
func (h *Handler) startAttemptCleanup() {
go func() {
ticker := time.NewTicker(attemptCleanupInterval)
defer ticker.Stop()
for range ticker.C {
h.purgeStaleAttempts()
}
}()
}
// purgeStaleAttempts removes IP entries that have been idle beyond attemptMaxIdleTime
// and whose ban (if any) has expired.
func (h *Handler) purgeStaleAttempts() {
now := time.Now()
h.attemptsMu.Lock()
defer h.attemptsMu.Unlock()
for ip, ai := range h.failedAttempts {
// Skip if still banned
if !ai.blockedUntil.IsZero() && now.Before(ai.blockedUntil) {
continue
}
// Remove if idle too long
if now.Sub(ai.lastActivity) > attemptMaxIdleTime {
delete(h.failedAttempts, ip)
}
}
} }
// NewHandler creates a new management handler instance. // NewHandler creates a new management handler instance.
@@ -149,6 +188,7 @@ func (h *Handler) Middleware() gin.HandlerFunc {
h.failedAttempts[clientIP] = aip h.failedAttempts[clientIP] = aip
} }
aip.count++ aip.count++
aip.lastActivity = time.Now()
if aip.count >= maxFailures { if aip.count >= maxFailures {
aip.blockedUntil = time.Now().Add(banDuration) aip.blockedUntil = time.Now().Add(banDuration)
aip.count = 0 aip.count = 0

View File

@@ -26,11 +26,17 @@ const (
// MinValidSignatureLen is the minimum length for a signature to be considered valid // MinValidSignatureLen is the minimum length for a signature to be considered valid
MinValidSignatureLen = 50 MinValidSignatureLen = 50
// SessionCleanupInterval controls how often stale sessions are purged
SessionCleanupInterval = 10 * time.Minute
) )
// signatureCache stores signatures by sessionId -> textHash -> SignatureEntry // signatureCache stores signatures by sessionId -> textHash -> SignatureEntry
var signatureCache sync.Map var signatureCache sync.Map
// sessionCleanupOnce ensures the background cleanup goroutine starts only once
var sessionCleanupOnce sync.Once
// sessionCache is the inner map type // sessionCache is the inner map type
type sessionCache struct { type sessionCache struct {
mu sync.RWMutex mu sync.RWMutex
@@ -45,6 +51,9 @@ func hashText(text string) string {
// getOrCreateSession gets or creates a session cache // getOrCreateSession gets or creates a session cache
func getOrCreateSession(sessionID string) *sessionCache { func getOrCreateSession(sessionID string) *sessionCache {
// Start background cleanup on first access
sessionCleanupOnce.Do(startSessionCleanup)
if val, ok := signatureCache.Load(sessionID); ok { if val, ok := signatureCache.Load(sessionID); ok {
return val.(*sessionCache) return val.(*sessionCache)
} }
@@ -53,6 +62,40 @@ func getOrCreateSession(sessionID string) *sessionCache {
return actual.(*sessionCache) return actual.(*sessionCache)
} }
// startSessionCleanup launches a background goroutine that periodically
// removes sessions where all entries have expired.
func startSessionCleanup() {
go func() {
ticker := time.NewTicker(SessionCleanupInterval)
defer ticker.Stop()
for range ticker.C {
purgeExpiredSessions()
}
}()
}
// purgeExpiredSessions removes sessions with no valid (non-expired) entries.
func purgeExpiredSessions() {
now := time.Now()
signatureCache.Range(func(key, value any) bool {
sc := value.(*sessionCache)
sc.mu.Lock()
// Remove expired entries
for k, entry := range sc.entries {
if now.Sub(entry.Timestamp) > SignatureCacheTTL {
delete(sc.entries, k)
}
}
isEmpty := len(sc.entries) == 0
sc.mu.Unlock()
// Remove session if empty
if isEmpty {
signatureCache.Delete(key)
}
return true
})
}
// CacheSignature stores a thinking signature for a given session and text. // CacheSignature stores a thinking signature for a given session and text.
// Used for Claude models that require signed thinking blocks in multi-turn conversations. // Used for Claude models that require signed thinking blocks in multi-turn conversations.
func CacheSignature(sessionID, text, signature string) { func CacheSignature(sessionID, text, signature string) {

View File

@@ -1,10 +1,68 @@
package executor package executor
import "time" import (
"sync"
"time"
)
type codexCache struct { type codexCache struct {
ID string ID string
Expire time.Time Expire time.Time
} }
var codexCacheMap = map[string]codexCache{} // codexCacheMap stores prompt cache IDs keyed by model+user_id.
// Protected by codexCacheMu. Entries expire after 1 hour.
var (
codexCacheMap = make(map[string]codexCache)
codexCacheMu sync.RWMutex
)
// codexCacheCleanupInterval controls how often expired entries are purged.
const codexCacheCleanupInterval = 15 * time.Minute
// codexCacheCleanupOnce ensures the background cleanup goroutine starts only once.
var codexCacheCleanupOnce sync.Once
// startCodexCacheCleanup launches a background goroutine that periodically
// removes expired entries from codexCacheMap to prevent memory leaks.
func startCodexCacheCleanup() {
go func() {
ticker := time.NewTicker(codexCacheCleanupInterval)
defer ticker.Stop()
for range ticker.C {
purgeExpiredCodexCache()
}
}()
}
// purgeExpiredCodexCache removes entries that have expired.
func purgeExpiredCodexCache() {
now := time.Now()
codexCacheMu.Lock()
defer codexCacheMu.Unlock()
for key, cache := range codexCacheMap {
if cache.Expire.Before(now) {
delete(codexCacheMap, key)
}
}
}
// getCodexCache retrieves a cached entry, returning ok=false if not found or expired.
func getCodexCache(key string) (codexCache, bool) {
codexCacheCleanupOnce.Do(startCodexCacheCleanup)
codexCacheMu.RLock()
cache, ok := codexCacheMap[key]
codexCacheMu.RUnlock()
if !ok || cache.Expire.Before(time.Now()) {
return codexCache{}, false
}
return cache, true
}
// setCodexCache stores a cache entry.
func setCodexCache(key string, cache codexCache) {
codexCacheCleanupOnce.Do(startCodexCacheCleanup)
codexCacheMu.Lock()
codexCacheMap[key] = cache
codexCacheMu.Unlock()
}

View File

@@ -457,14 +457,14 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Form
if from == "claude" { if from == "claude" {
userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id")
if userIDResult.Exists() { if userIDResult.Exists() {
var hasKey bool
key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String()) key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String())
if cache, hasKey = codexCacheMap[key]; !hasKey || cache.Expire.Before(time.Now()) { var ok bool
if cache, ok = getCodexCache(key); !ok {
cache = codexCache{ cache = codexCache{
ID: uuid.New().String(), ID: uuid.New().String(),
Expire: time.Now().Add(1 * time.Hour), Expire: time.Now().Add(1 * time.Hour),
} }
codexCacheMap[key] = cache setCodexCache(key, cache)
} }
} }
} else if from == "openai-response" { } else if from == "openai-response" {

View File

@@ -87,6 +87,10 @@ type modelStats struct {
Details []RequestDetail Details []RequestDetail
} }
// maxDetailsPerModel limits the number of request details retained per model
// to prevent unbounded memory growth. Oldest entries are dropped when exceeded.
const maxDetailsPerModel = 1000
// RequestDetail stores the timestamp and token usage for a single request. // RequestDetail stores the timestamp and token usage for a single request.
type RequestDetail struct { type RequestDetail struct {
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
@@ -221,6 +225,11 @@ func (s *RequestStatistics) updateAPIStats(stats *apiStats, model string, detail
modelStatsValue.TotalRequests++ modelStatsValue.TotalRequests++
modelStatsValue.TotalTokens += detail.Tokens.TotalTokens modelStatsValue.TotalTokens += detail.Tokens.TotalTokens
modelStatsValue.Details = append(modelStatsValue.Details, detail) modelStatsValue.Details = append(modelStatsValue.Details, detail)
// Prevent unbounded growth by dropping oldest entries when limit exceeded
if len(modelStatsValue.Details) > maxDetailsPerModel {
excess := len(modelStatsValue.Details) - maxDetailsPerModel
modelStatsValue.Details = modelStatsValue.Details[excess:]
}
} }
// Snapshot returns a copy of the aggregated metrics for external consumption. // Snapshot returns a copy of the aggregated metrics for external consumption.