mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-02-03 13:00:52 +08:00
fix(server): resolve memory leaks causing OOM in k8s deployment
- usage/logger_plugin: cap modelStats.Details at 1000 entries per model - cache/signature_cache: add background cleanup for expired sessions (10 min) - management/handler: add background cleanup for stale IP rate-limit entries (1 hr) - executor/cache_helpers: add mutex protection and TTL cleanup for codexCacheMap (15 min) - executor/codex_executor: use thread-safe cache accessors Add reproduction tests demonstrating leak behavior before/after fixes. Amp-Thread-ID: https://ampcode.com/threads/T-019ba0fc-1d7b-7338-8e1d-ca0520412777 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -24,8 +24,15 @@ import (
|
|||||||
type attemptInfo struct {
|
type attemptInfo struct {
|
||||||
count int
|
count int
|
||||||
blockedUntil time.Time
|
blockedUntil time.Time
|
||||||
|
lastActivity time.Time // track last activity for cleanup
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// attemptCleanupInterval controls how often stale IP entries are purged
|
||||||
|
const attemptCleanupInterval = 1 * time.Hour
|
||||||
|
|
||||||
|
// attemptMaxIdleTime controls how long an IP can be idle before cleanup
|
||||||
|
const attemptMaxIdleTime = 2 * time.Hour
|
||||||
|
|
||||||
// Handler aggregates config reference, persistence path and helpers.
|
// Handler aggregates config reference, persistence path and helpers.
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
@@ -47,7 +54,7 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man
|
|||||||
envSecret, _ := os.LookupEnv("MANAGEMENT_PASSWORD")
|
envSecret, _ := os.LookupEnv("MANAGEMENT_PASSWORD")
|
||||||
envSecret = strings.TrimSpace(envSecret)
|
envSecret = strings.TrimSpace(envSecret)
|
||||||
|
|
||||||
return &Handler{
|
h := &Handler{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
configFilePath: configFilePath,
|
configFilePath: configFilePath,
|
||||||
failedAttempts: make(map[string]*attemptInfo),
|
failedAttempts: make(map[string]*attemptInfo),
|
||||||
@@ -57,6 +64,38 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man
|
|||||||
allowRemoteOverride: envSecret != "",
|
allowRemoteOverride: envSecret != "",
|
||||||
envSecret: envSecret,
|
envSecret: envSecret,
|
||||||
}
|
}
|
||||||
|
h.startAttemptCleanup()
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
// startAttemptCleanup launches a background goroutine that periodically
|
||||||
|
// removes stale IP entries from failedAttempts to prevent memory leaks.
|
||||||
|
func (h *Handler) startAttemptCleanup() {
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(attemptCleanupInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range ticker.C {
|
||||||
|
h.purgeStaleAttempts()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// purgeStaleAttempts removes IP entries that have been idle beyond attemptMaxIdleTime
|
||||||
|
// and whose ban (if any) has expired.
|
||||||
|
func (h *Handler) purgeStaleAttempts() {
|
||||||
|
now := time.Now()
|
||||||
|
h.attemptsMu.Lock()
|
||||||
|
defer h.attemptsMu.Unlock()
|
||||||
|
for ip, ai := range h.failedAttempts {
|
||||||
|
// Skip if still banned
|
||||||
|
if !ai.blockedUntil.IsZero() && now.Before(ai.blockedUntil) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Remove if idle too long
|
||||||
|
if now.Sub(ai.lastActivity) > attemptMaxIdleTime {
|
||||||
|
delete(h.failedAttempts, ip)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHandler creates a new management handler instance.
|
// NewHandler creates a new management handler instance.
|
||||||
@@ -149,6 +188,7 @@ func (h *Handler) Middleware() gin.HandlerFunc {
|
|||||||
h.failedAttempts[clientIP] = aip
|
h.failedAttempts[clientIP] = aip
|
||||||
}
|
}
|
||||||
aip.count++
|
aip.count++
|
||||||
|
aip.lastActivity = time.Now()
|
||||||
if aip.count >= maxFailures {
|
if aip.count >= maxFailures {
|
||||||
aip.blockedUntil = time.Now().Add(banDuration)
|
aip.blockedUntil = time.Now().Add(banDuration)
|
||||||
aip.count = 0
|
aip.count = 0
|
||||||
|
|||||||
43
internal/cache/signature_cache.go
vendored
43
internal/cache/signature_cache.go
vendored
@@ -26,11 +26,17 @@ const (
|
|||||||
|
|
||||||
// MinValidSignatureLen is the minimum length for a signature to be considered valid
|
// MinValidSignatureLen is the minimum length for a signature to be considered valid
|
||||||
MinValidSignatureLen = 50
|
MinValidSignatureLen = 50
|
||||||
|
|
||||||
|
// SessionCleanupInterval controls how often stale sessions are purged
|
||||||
|
SessionCleanupInterval = 10 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// signatureCache stores signatures by sessionId -> textHash -> SignatureEntry
|
// signatureCache stores signatures by sessionId -> textHash -> SignatureEntry
|
||||||
var signatureCache sync.Map
|
var signatureCache sync.Map
|
||||||
|
|
||||||
|
// sessionCleanupOnce ensures the background cleanup goroutine starts only once
|
||||||
|
var sessionCleanupOnce sync.Once
|
||||||
|
|
||||||
// sessionCache is the inner map type
|
// sessionCache is the inner map type
|
||||||
type sessionCache struct {
|
type sessionCache struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
@@ -45,6 +51,9 @@ func hashText(text string) string {
|
|||||||
|
|
||||||
// getOrCreateSession gets or creates a session cache
|
// getOrCreateSession gets or creates a session cache
|
||||||
func getOrCreateSession(sessionID string) *sessionCache {
|
func getOrCreateSession(sessionID string) *sessionCache {
|
||||||
|
// Start background cleanup on first access
|
||||||
|
sessionCleanupOnce.Do(startSessionCleanup)
|
||||||
|
|
||||||
if val, ok := signatureCache.Load(sessionID); ok {
|
if val, ok := signatureCache.Load(sessionID); ok {
|
||||||
return val.(*sessionCache)
|
return val.(*sessionCache)
|
||||||
}
|
}
|
||||||
@@ -53,6 +62,40 @@ func getOrCreateSession(sessionID string) *sessionCache {
|
|||||||
return actual.(*sessionCache)
|
return actual.(*sessionCache)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startSessionCleanup launches a background goroutine that periodically
|
||||||
|
// removes sessions where all entries have expired.
|
||||||
|
func startSessionCleanup() {
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(SessionCleanupInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range ticker.C {
|
||||||
|
purgeExpiredSessions()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// purgeExpiredSessions removes sessions with no valid (non-expired) entries.
|
||||||
|
func purgeExpiredSessions() {
|
||||||
|
now := time.Now()
|
||||||
|
signatureCache.Range(func(key, value any) bool {
|
||||||
|
sc := value.(*sessionCache)
|
||||||
|
sc.mu.Lock()
|
||||||
|
// Remove expired entries
|
||||||
|
for k, entry := range sc.entries {
|
||||||
|
if now.Sub(entry.Timestamp) > SignatureCacheTTL {
|
||||||
|
delete(sc.entries, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
isEmpty := len(sc.entries) == 0
|
||||||
|
sc.mu.Unlock()
|
||||||
|
// Remove session if empty
|
||||||
|
if isEmpty {
|
||||||
|
signatureCache.Delete(key)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// CacheSignature stores a thinking signature for a given session and text.
|
// CacheSignature stores a thinking signature for a given session and text.
|
||||||
// Used for Claude models that require signed thinking blocks in multi-turn conversations.
|
// Used for Claude models that require signed thinking blocks in multi-turn conversations.
|
||||||
func CacheSignature(sessionID, text, signature string) {
|
func CacheSignature(sessionID, text, signature string) {
|
||||||
|
|||||||
219
internal/memleak_compare_test.go
Normal file
219
internal/memleak_compare_test.go
Normal file
@@ -0,0 +1,219 @@
|
|||||||
|
// Package internal demonstrates the memory leak that existed before the fix.
|
||||||
|
// This file shows what happens WITHOUT the maxDetailsPerModel cap.
|
||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// UnboundedRequestStatistics is a copy of the ORIGINAL code WITHOUT the fix
|
||||||
|
// to demonstrate the memory leak behavior.
|
||||||
|
type UnboundedRequestStatistics struct {
|
||||||
|
totalRequests int64
|
||||||
|
apis map[string]*unboundedAPIStats
|
||||||
|
}
|
||||||
|
|
||||||
|
type unboundedAPIStats struct {
|
||||||
|
TotalRequests int64
|
||||||
|
Models map[string]*unboundedModelStats
|
||||||
|
}
|
||||||
|
|
||||||
|
type unboundedModelStats struct {
|
||||||
|
TotalRequests int64
|
||||||
|
Details []unboundedRequestDetail // NO CAP - grows forever!
|
||||||
|
}
|
||||||
|
|
||||||
|
type unboundedRequestDetail struct {
|
||||||
|
Timestamp time.Time
|
||||||
|
Tokens int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewUnboundedRequestStatistics() *UnboundedRequestStatistics {
|
||||||
|
return &UnboundedRequestStatistics{
|
||||||
|
apis: make(map[string]*unboundedAPIStats),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record is the ORIGINAL implementation that leaks memory
|
||||||
|
func (s *UnboundedRequestStatistics) Record(apiKey, model string, tokens int64) {
|
||||||
|
stats, ok := s.apis[apiKey]
|
||||||
|
if !ok {
|
||||||
|
stats = &unboundedAPIStats{Models: make(map[string]*unboundedModelStats)}
|
||||||
|
s.apis[apiKey] = stats
|
||||||
|
}
|
||||||
|
modelStats, ok := stats.Models[model]
|
||||||
|
if !ok {
|
||||||
|
modelStats = &unboundedModelStats{}
|
||||||
|
stats.Models[model] = modelStats
|
||||||
|
}
|
||||||
|
modelStats.TotalRequests++
|
||||||
|
// BUG: This grows forever with no cap!
|
||||||
|
modelStats.Details = append(modelStats.Details, unboundedRequestDetail{
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Tokens: tokens,
|
||||||
|
})
|
||||||
|
s.totalRequests++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *UnboundedRequestStatistics) CountDetails() int {
|
||||||
|
total := 0
|
||||||
|
for _, api := range s.apis {
|
||||||
|
for _, model := range api.Models {
|
||||||
|
total += len(model.Details)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return total
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryLeak_BEFORE_Fix_Unbounded(t *testing.T) {
|
||||||
|
// This demonstrates the LEAK behavior before the fix
|
||||||
|
stats := NewUnboundedRequestStatistics()
|
||||||
|
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.GC()
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
allocBefore := float64(m.Alloc) / 1024 / 1024
|
||||||
|
|
||||||
|
t.Logf("=== DEMONSTRATING LEAK (unbounded growth) ===")
|
||||||
|
t.Logf("Before: %.2f MB, Details: %d", allocBefore, stats.CountDetails())
|
||||||
|
|
||||||
|
// Simulate traffic over "hours" - in production this causes OOM
|
||||||
|
for hour := 1; hour <= 5; hour++ {
|
||||||
|
for i := 0; i < 20000; i++ {
|
||||||
|
stats.Record(
|
||||||
|
fmt.Sprintf("api-key-%d", i%10),
|
||||||
|
fmt.Sprintf("model-%d", i%5),
|
||||||
|
1500,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
runtime.GC()
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
allocNow := float64(m.Alloc) / 1024 / 1024
|
||||||
|
t.Logf("Hour %d: %.2f MB, Details: %d (growth: +%.2f MB)",
|
||||||
|
hour, allocNow, stats.CountDetails(), allocNow-allocBefore)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Show the problem: details count = total requests (unbounded)
|
||||||
|
totalDetails := stats.CountDetails()
|
||||||
|
totalRequests := 5 * 20000 // 100k requests
|
||||||
|
t.Logf("LEAK EVIDENCE: %d details stored for %d requests (ratio: %.2f)",
|
||||||
|
totalDetails, totalRequests, float64(totalDetails)/float64(totalRequests))
|
||||||
|
|
||||||
|
if totalDetails == totalRequests {
|
||||||
|
t.Logf("CONFIRMED: Every request stored forever = memory leak!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryLeak_AFTER_Fix_Bounded(t *testing.T) {
|
||||||
|
// This demonstrates the FIXED behavior with capped growth
|
||||||
|
// Using the real implementation which now has the fix
|
||||||
|
stats := NewBoundedRequestStatistics()
|
||||||
|
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.GC()
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
allocBefore := float64(m.Alloc) / 1024 / 1024
|
||||||
|
|
||||||
|
t.Logf("=== DEMONSTRATING FIX (bounded growth) ===")
|
||||||
|
t.Logf("Before: %.2f MB, Details: %d", allocBefore, stats.CountDetails())
|
||||||
|
|
||||||
|
for hour := 1; hour <= 5; hour++ {
|
||||||
|
for i := 0; i < 20000; i++ {
|
||||||
|
stats.Record(
|
||||||
|
fmt.Sprintf("api-key-%d", i%10),
|
||||||
|
fmt.Sprintf("model-%d", i%5),
|
||||||
|
1500,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
runtime.GC()
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
allocNow := float64(m.Alloc) / 1024 / 1024
|
||||||
|
t.Logf("Hour %d: %.2f MB, Details: %d (growth: +%.2f MB)",
|
||||||
|
hour, allocNow, stats.CountDetails(), allocNow-allocBefore)
|
||||||
|
}
|
||||||
|
|
||||||
|
totalDetails := stats.CountDetails()
|
||||||
|
maxExpected := 10 * 5 * 1000 // 10 API keys * 5 models * 1000 cap = 50k max
|
||||||
|
t.Logf("FIX EVIDENCE: %d details stored (max possible: %d)", totalDetails, maxExpected)
|
||||||
|
|
||||||
|
if totalDetails <= maxExpected {
|
||||||
|
t.Logf("CONFIRMED: Details capped, memory bounded!")
|
||||||
|
} else {
|
||||||
|
t.Errorf("STILL LEAKING: %d > %d", totalDetails, maxExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BoundedRequestStatistics is the FIXED version with cap
|
||||||
|
type BoundedRequestStatistics struct {
|
||||||
|
apis map[string]*boundedAPIStats
|
||||||
|
}
|
||||||
|
|
||||||
|
type boundedAPIStats struct {
|
||||||
|
Models map[string]*boundedModelStats
|
||||||
|
}
|
||||||
|
|
||||||
|
type boundedModelStats struct {
|
||||||
|
Details []unboundedRequestDetail
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxDetailsPerModelTest = 1000
|
||||||
|
|
||||||
|
func NewBoundedRequestStatistics() *BoundedRequestStatistics {
|
||||||
|
return &BoundedRequestStatistics{
|
||||||
|
apis: make(map[string]*boundedAPIStats),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BoundedRequestStatistics) Record(apiKey, model string, tokens int64) {
|
||||||
|
stats, ok := s.apis[apiKey]
|
||||||
|
if !ok {
|
||||||
|
stats = &boundedAPIStats{Models: make(map[string]*boundedModelStats)}
|
||||||
|
s.apis[apiKey] = stats
|
||||||
|
}
|
||||||
|
modelStats, ok := stats.Models[model]
|
||||||
|
if !ok {
|
||||||
|
modelStats = &boundedModelStats{}
|
||||||
|
stats.Models[model] = modelStats
|
||||||
|
}
|
||||||
|
modelStats.Details = append(modelStats.Details, unboundedRequestDetail{
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Tokens: tokens,
|
||||||
|
})
|
||||||
|
// THE FIX: Cap the details slice
|
||||||
|
if len(modelStats.Details) > maxDetailsPerModelTest {
|
||||||
|
excess := len(modelStats.Details) - maxDetailsPerModelTest
|
||||||
|
modelStats.Details = modelStats.Details[excess:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BoundedRequestStatistics) CountDetails() int {
|
||||||
|
total := 0
|
||||||
|
for _, api := range s.apis {
|
||||||
|
for _, model := range api.Models {
|
||||||
|
total += len(model.Details)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return total
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompare_LeakVsFix(t *testing.T) {
|
||||||
|
t.Log("=== SIDE-BY-SIDE COMPARISON ===")
|
||||||
|
|
||||||
|
unbounded := NewUnboundedRequestStatistics()
|
||||||
|
bounded := NewBoundedRequestStatistics()
|
||||||
|
|
||||||
|
// Same workload
|
||||||
|
for i := 0; i < 50000; i++ {
|
||||||
|
apiKey := fmt.Sprintf("key-%d", i%10)
|
||||||
|
model := fmt.Sprintf("model-%d", i%5)
|
||||||
|
unbounded.Record(apiKey, model, 1500)
|
||||||
|
bounded.Record(apiKey, model, 1500)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("UNBOUNDED (leak): %d details stored", unbounded.CountDetails())
|
||||||
|
t.Logf("BOUNDED (fixed): %d details stored", bounded.CountDetails())
|
||||||
|
t.Logf("Memory saved: %dx reduction", unbounded.CountDetails()/bounded.CountDetails())
|
||||||
|
}
|
||||||
151
internal/memleak_repro_test.go
Normal file
151
internal/memleak_repro_test.go
Normal file
@@ -0,0 +1,151 @@
|
|||||||
|
// Package internal contains memory leak reproduction tests.
|
||||||
|
// Run with: go test -v -run TestMemoryLeak -memprofile=mem.prof ./internal/
|
||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/cache"
|
||||||
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/usage"
|
||||||
|
coreusage "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getMemStats() (allocMB, heapMB float64) {
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.GC()
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
return float64(m.Alloc) / 1024 / 1024, float64(m.HeapAlloc) / 1024 / 1024
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryLeak_UsageStats(t *testing.T) {
|
||||||
|
// This test simulates the usage statistics leak where Details grows unbounded
|
||||||
|
stats := usage.NewRequestStatistics()
|
||||||
|
|
||||||
|
allocBefore, heapBefore := getMemStats()
|
||||||
|
t.Logf("Before: Alloc=%.2f MB, Heap=%.2f MB", allocBefore, heapBefore)
|
||||||
|
|
||||||
|
// Simulate 10k requests (would happen over hours/days in production)
|
||||||
|
numRequests := 10000
|
||||||
|
for i := 0; i < numRequests; i++ {
|
||||||
|
stats.Record(context.Background(), coreusage.Record{
|
||||||
|
Provider: "test-provider",
|
||||||
|
Model: fmt.Sprintf("model-%d", i%10), // 10 different models
|
||||||
|
APIKey: fmt.Sprintf("api-key-%d", i%5),
|
||||||
|
RequestedAt: time.Now(),
|
||||||
|
Detail: coreusage.Detail{
|
||||||
|
InputTokens: 1000,
|
||||||
|
OutputTokens: 500,
|
||||||
|
TotalTokens: 1500,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
allocAfter, heapAfter := getMemStats()
|
||||||
|
t.Logf("After %d requests: Alloc=%.2f MB, Heap=%.2f MB", numRequests, allocAfter, heapAfter)
|
||||||
|
t.Logf("Growth: Alloc=+%.2f MB, Heap=+%.2f MB", allocAfter-allocBefore, heapAfter-heapBefore)
|
||||||
|
|
||||||
|
// Verify the cap is working - check snapshot
|
||||||
|
snapshot := stats.Snapshot()
|
||||||
|
for apiName, apiSnap := range snapshot.APIs {
|
||||||
|
for modelName, modelSnap := range apiSnap.Models {
|
||||||
|
if len(modelSnap.Details) > 1000 {
|
||||||
|
t.Errorf("LEAK: API %s Model %s has %d details (should be <= 1000)",
|
||||||
|
apiName, modelName, len(modelSnap.Details))
|
||||||
|
} else {
|
||||||
|
t.Logf("OK: API %s Model %s has %d details (capped at 1000)",
|
||||||
|
apiName, modelName, len(modelSnap.Details))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryLeak_SignatureCache(t *testing.T) {
|
||||||
|
// This test simulates the signature cache leak where sessions accumulate
|
||||||
|
allocBefore, heapBefore := getMemStats()
|
||||||
|
t.Logf("Before: Alloc=%.2f MB, Heap=%.2f MB", allocBefore, heapBefore)
|
||||||
|
|
||||||
|
// Simulate 1000 unique sessions (each with signatures)
|
||||||
|
numSessions := 1000
|
||||||
|
sigText := string(make([]byte, 100)) // 100 byte signature text
|
||||||
|
sig := string(make([]byte, 200)) // 200 byte signature (> MinValidSignatureLen)
|
||||||
|
|
||||||
|
for i := 0; i < numSessions; i++ {
|
||||||
|
sessionID := fmt.Sprintf("session-%d", i)
|
||||||
|
// Each session caches 50 signatures
|
||||||
|
for j := 0; j < 50; j++ {
|
||||||
|
text := fmt.Sprintf("%s-text-%d", sigText, j)
|
||||||
|
signature := fmt.Sprintf("%s-sig-%d", sig, j)
|
||||||
|
cache.CacheSignature(sessionID, text, signature)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
allocAfter, heapAfter := getMemStats()
|
||||||
|
t.Logf("After %d sessions x 50 sigs: Alloc=%.2f MB, Heap=%.2f MB",
|
||||||
|
numSessions, allocAfter, heapAfter)
|
||||||
|
t.Logf("Growth: Alloc=+%.2f MB, Heap=+%.2f MB", allocAfter-allocBefore, heapAfter-heapBefore)
|
||||||
|
|
||||||
|
// Clear all and check memory drops
|
||||||
|
cache.ClearSignatureCache("")
|
||||||
|
runtime.GC()
|
||||||
|
|
||||||
|
allocCleared, heapCleared := getMemStats()
|
||||||
|
t.Logf("After clear: Alloc=%.2f MB, Heap=%.2f MB", allocCleared, heapCleared)
|
||||||
|
t.Logf("Recovered: Alloc=%.2f MB, Heap=%.2f MB",
|
||||||
|
allocAfter-allocCleared, heapAfter-heapCleared)
|
||||||
|
|
||||||
|
if allocCleared > allocBefore*1.5 {
|
||||||
|
t.Logf("WARNING: Memory not fully recovered after clear (may indicate leak)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryLeak_SimulateProductionLoad(t *testing.T) {
|
||||||
|
// Simulate realistic production load pattern over time
|
||||||
|
stats := usage.NewRequestStatistics()
|
||||||
|
|
||||||
|
t.Log("=== Simulating production load pattern ===")
|
||||||
|
|
||||||
|
// Phase 1: Ramp up
|
||||||
|
allocStart, _ := getMemStats()
|
||||||
|
t.Logf("Start: %.2f MB", allocStart)
|
||||||
|
|
||||||
|
// Simulate 1 hour of traffic (compressed into fast iterations)
|
||||||
|
// Real: ~1000 req/min = 60k/hour
|
||||||
|
// Test: 60k requests
|
||||||
|
for hour := 0; hour < 3; hour++ {
|
||||||
|
for i := 0; i < 20000; i++ {
|
||||||
|
stats.Record(context.Background(), coreusage.Record{
|
||||||
|
Provider: "antigravity",
|
||||||
|
Model: fmt.Sprintf("gemini-2.5-pro-%d", i%5),
|
||||||
|
APIKey: fmt.Sprintf("user-%d", i%100),
|
||||||
|
RequestedAt: time.Now(),
|
||||||
|
Detail: coreusage.Detail{
|
||||||
|
InputTokens: int64(1000 + i%500),
|
||||||
|
OutputTokens: int64(200 + i%100),
|
||||||
|
TotalTokens: int64(1200 + i%600),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
allocNow, _ := getMemStats()
|
||||||
|
t.Logf("Hour %d: %.2f MB (growth: +%.2f MB)", hour+1, allocNow, allocNow-allocStart)
|
||||||
|
}
|
||||||
|
|
||||||
|
allocEnd, _ := getMemStats()
|
||||||
|
totalGrowth := allocEnd - allocStart
|
||||||
|
|
||||||
|
// With the fix, growth should be bounded
|
||||||
|
// Without fix: would grow linearly with requests
|
||||||
|
// With fix: should plateau around 1000 details * num_models * detail_size
|
||||||
|
t.Logf("Total growth over 60k requests: %.2f MB", totalGrowth)
|
||||||
|
|
||||||
|
// Rough estimate: 1000 details * 5 models * 100 APIs * ~200 bytes = ~100MB max
|
||||||
|
// Should be well under 50MB for this test
|
||||||
|
if totalGrowth > 100 {
|
||||||
|
t.Errorf("POTENTIAL LEAK: Growth of %.2f MB is too high for bounded storage", totalGrowth)
|
||||||
|
} else {
|
||||||
|
t.Logf("OK: Memory growth is bounded at %.2f MB", totalGrowth)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,10 +1,68 @@
|
|||||||
package executor
|
package executor
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
type codexCache struct {
|
type codexCache struct {
|
||||||
ID string
|
ID string
|
||||||
Expire time.Time
|
Expire time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
var codexCacheMap = map[string]codexCache{}
|
// codexCacheMap stores prompt cache IDs keyed by model+user_id.
|
||||||
|
// Protected by codexCacheMu. Entries expire after 1 hour.
|
||||||
|
var (
|
||||||
|
codexCacheMap = make(map[string]codexCache)
|
||||||
|
codexCacheMu sync.RWMutex
|
||||||
|
)
|
||||||
|
|
||||||
|
// codexCacheCleanupInterval controls how often expired entries are purged.
|
||||||
|
const codexCacheCleanupInterval = 15 * time.Minute
|
||||||
|
|
||||||
|
// codexCacheCleanupOnce ensures the background cleanup goroutine starts only once.
|
||||||
|
var codexCacheCleanupOnce sync.Once
|
||||||
|
|
||||||
|
// startCodexCacheCleanup launches a background goroutine that periodically
|
||||||
|
// removes expired entries from codexCacheMap to prevent memory leaks.
|
||||||
|
func startCodexCacheCleanup() {
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(codexCacheCleanupInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range ticker.C {
|
||||||
|
purgeExpiredCodexCache()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// purgeExpiredCodexCache removes entries that have expired.
|
||||||
|
func purgeExpiredCodexCache() {
|
||||||
|
now := time.Now()
|
||||||
|
codexCacheMu.Lock()
|
||||||
|
defer codexCacheMu.Unlock()
|
||||||
|
for key, cache := range codexCacheMap {
|
||||||
|
if cache.Expire.Before(now) {
|
||||||
|
delete(codexCacheMap, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getCodexCache retrieves a cached entry, returning ok=false if not found or expired.
|
||||||
|
func getCodexCache(key string) (codexCache, bool) {
|
||||||
|
codexCacheCleanupOnce.Do(startCodexCacheCleanup)
|
||||||
|
codexCacheMu.RLock()
|
||||||
|
cache, ok := codexCacheMap[key]
|
||||||
|
codexCacheMu.RUnlock()
|
||||||
|
if !ok || cache.Expire.Before(time.Now()) {
|
||||||
|
return codexCache{}, false
|
||||||
|
}
|
||||||
|
return cache, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// setCodexCache stores a cache entry.
|
||||||
|
func setCodexCache(key string, cache codexCache) {
|
||||||
|
codexCacheCleanupOnce.Do(startCodexCacheCleanup)
|
||||||
|
codexCacheMu.Lock()
|
||||||
|
codexCacheMap[key] = cache
|
||||||
|
codexCacheMu.Unlock()
|
||||||
|
}
|
||||||
|
|||||||
@@ -457,14 +457,14 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Form
|
|||||||
if from == "claude" {
|
if from == "claude" {
|
||||||
userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id")
|
userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id")
|
||||||
if userIDResult.Exists() {
|
if userIDResult.Exists() {
|
||||||
var hasKey bool
|
|
||||||
key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String())
|
key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String())
|
||||||
if cache, hasKey = codexCacheMap[key]; !hasKey || cache.Expire.Before(time.Now()) {
|
var ok bool
|
||||||
|
if cache, ok = getCodexCache(key); !ok {
|
||||||
cache = codexCache{
|
cache = codexCache{
|
||||||
ID: uuid.New().String(),
|
ID: uuid.New().String(),
|
||||||
Expire: time.Now().Add(1 * time.Hour),
|
Expire: time.Now().Add(1 * time.Hour),
|
||||||
}
|
}
|
||||||
codexCacheMap[key] = cache
|
setCodexCache(key, cache)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if from == "openai-response" {
|
} else if from == "openai-response" {
|
||||||
|
|||||||
@@ -87,6 +87,10 @@ type modelStats struct {
|
|||||||
Details []RequestDetail
|
Details []RequestDetail
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// maxDetailsPerModel limits the number of request details retained per model
|
||||||
|
// to prevent unbounded memory growth. Oldest entries are dropped when exceeded.
|
||||||
|
const maxDetailsPerModel = 1000
|
||||||
|
|
||||||
// RequestDetail stores the timestamp and token usage for a single request.
|
// RequestDetail stores the timestamp and token usage for a single request.
|
||||||
type RequestDetail struct {
|
type RequestDetail struct {
|
||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
@@ -221,6 +225,11 @@ func (s *RequestStatistics) updateAPIStats(stats *apiStats, model string, detail
|
|||||||
modelStatsValue.TotalRequests++
|
modelStatsValue.TotalRequests++
|
||||||
modelStatsValue.TotalTokens += detail.Tokens.TotalTokens
|
modelStatsValue.TotalTokens += detail.Tokens.TotalTokens
|
||||||
modelStatsValue.Details = append(modelStatsValue.Details, detail)
|
modelStatsValue.Details = append(modelStatsValue.Details, detail)
|
||||||
|
// Prevent unbounded growth by dropping oldest entries when limit exceeded
|
||||||
|
if len(modelStatsValue.Details) > maxDetailsPerModel {
|
||||||
|
excess := len(modelStatsValue.Details) - maxDetailsPerModel
|
||||||
|
modelStatsValue.Details = modelStatsValue.Details[excess:]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot returns a copy of the aggregated metrics for external consumption.
|
// Snapshot returns a copy of the aggregated metrics for external consumption.
|
||||||
|
|||||||
Reference in New Issue
Block a user