From 3b421c8181c93393ac715d8281cefd06c68d2e03 Mon Sep 17 00:00:00 2001 From: piexian <64474352+piexian@users.noreply.github.com> Date: Mon, 23 Feb 2026 00:38:46 +0800 Subject: [PATCH] feat(qwen): add rate limiting and quota error handling - Add 60 requests/minute rate limiting per credential using sliding window - Detect insufficient_quota errors and set cooldown until next day (Beijing time) - Map quota errors (HTTP 403/429) to 429 with retryAfter for conductor integration - Cache Beijing timezone at package level to avoid repeated syscalls - Add redactAuthID function to protect credentials in logs - Extract wrapQwenError helper to consolidate error handling --- internal/runtime/executor/qwen_executor.go | 185 ++++++++++++++++++++- 1 file changed, 176 insertions(+), 9 deletions(-) diff --git a/internal/runtime/executor/qwen_executor.go b/internal/runtime/executor/qwen_executor.go index bcc4a057..e7957d29 100644 --- a/internal/runtime/executor/qwen_executor.go +++ b/internal/runtime/executor/qwen_executor.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "strings" + "sync" "time" qwenauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/qwen" @@ -22,9 +23,151 @@ import ( ) const ( - qwenUserAgent = "QwenCode/0.10.3 (darwin; arm64)" + qwenUserAgent = "QwenCode/0.10.3 (darwin; arm64)" + qwenRateLimitPerMin = 60 // 60 requests per minute per credential + qwenRateLimitWindow = time.Minute // sliding window duration ) +// qwenBeijingLoc caches the Beijing timezone to avoid repeated LoadLocation syscalls. +var qwenBeijingLoc = func() *time.Location { + loc, err := time.LoadLocation("Asia/Shanghai") + if err != nil || loc == nil { + log.Warnf("qwen: failed to load Asia/Shanghai timezone: %v, using fixed UTC+8", err) + return time.FixedZone("CST", 8*3600) + } + return loc +}() + +// qwenQuotaCodes is a package-level set of error codes that indicate quota exhaustion. +var qwenQuotaCodes = map[string]struct{}{ + "insufficient_quota": {}, + "quota_exceeded": {}, +} + +// qwenRateLimiter tracks request timestamps per credential for rate limiting. +// Qwen has a limit of 60 requests per minute per account. +var qwenRateLimiter = struct { + sync.Mutex + requests map[string][]time.Time // authID -> request timestamps +}{ + requests: make(map[string][]time.Time), +} + +// redactAuthID returns a redacted version of the auth ID for safe logging. +// Keeps a small prefix/suffix to allow correlation across events. +func redactAuthID(id string) string { + if id == "" { + return "" + } + if len(id) <= 8 { + return id + } + return id[:4] + "..." + id[len(id)-4:] +} + +// checkQwenRateLimit checks if the credential has exceeded the rate limit. +// Returns nil if allowed, or a statusErr with retryAfter if rate limited. +func checkQwenRateLimit(authID string) error { + if authID == "" { + // Empty authID should not bypass rate limiting in production + // Use debug level to avoid log spam for certain auth flows + log.Debug("qwen rate limit check: empty authID, skipping rate limit") + return nil + } + + now := time.Now() + windowStart := now.Add(-qwenRateLimitWindow) + + qwenRateLimiter.Lock() + defer qwenRateLimiter.Unlock() + + // Get and filter timestamps within the window + timestamps := qwenRateLimiter.requests[authID] + var validTimestamps []time.Time + for _, ts := range timestamps { + if ts.After(windowStart) { + validTimestamps = append(validTimestamps, ts) + } + } + + // Always prune expired entries to prevent memory leak + // Delete empty entries, otherwise update with pruned slice + if len(validTimestamps) == 0 { + delete(qwenRateLimiter.requests, authID) + } + + // Check if rate limit exceeded + if len(validTimestamps) >= qwenRateLimitPerMin { + // Calculate when the oldest request will expire + oldestInWindow := validTimestamps[0] + retryAfter := oldestInWindow.Add(qwenRateLimitWindow).Sub(now) + if retryAfter < time.Second { + retryAfter = time.Second + } + retryAfterSec := int(retryAfter.Seconds()) + return statusErr{ + code: http.StatusTooManyRequests, + msg: fmt.Sprintf(`{"error":{"code":"rate_limit_exceeded","message":"Qwen rate limit: %d requests/minute exceeded, retry after %ds","type":"rate_limit_exceeded"}}`, qwenRateLimitPerMin, retryAfterSec), + retryAfter: &retryAfter, + } + } + + // Record this request and update the map with pruned timestamps + validTimestamps = append(validTimestamps, now) + qwenRateLimiter.requests[authID] = validTimestamps + + return nil +} + +// isQwenQuotaError checks if the error response indicates a quota exceeded error. +// Qwen returns HTTP 403 with error.code="insufficient_quota" when daily quota is exhausted. +func isQwenQuotaError(body []byte) bool { + code := strings.ToLower(gjson.GetBytes(body, "error.code").String()) + errType := strings.ToLower(gjson.GetBytes(body, "error.type").String()) + + // Primary check: exact match on error.code or error.type (most reliable) + if _, ok := qwenQuotaCodes[code]; ok { + return true + } + if _, ok := qwenQuotaCodes[errType]; ok { + return true + } + + // Fallback: check message only if code/type don't match (less reliable) + msg := strings.ToLower(gjson.GetBytes(body, "error.message").String()) + if strings.Contains(msg, "insufficient_quota") || strings.Contains(msg, "quota exceeded") || + strings.Contains(msg, "free allocated quota exceeded") { + return true + } + + return false +} + +// wrapQwenError wraps an HTTP error response, detecting quota errors and mapping them to 429. +// Returns the appropriate status code and retryAfter duration for statusErr. +// Only checks for quota errors when httpCode is 403 or 429 to avoid false positives. +func wrapQwenError(ctx context.Context, httpCode int, body []byte) (errCode int, retryAfter *time.Duration) { + errCode = httpCode + // Only check quota errors for expected status codes to avoid false positives + // Qwen returns 403 for quota errors, 429 for rate limits + if (httpCode == http.StatusForbidden || httpCode == http.StatusTooManyRequests) && isQwenQuotaError(body) { + errCode = http.StatusTooManyRequests // Map to 429 to trigger quota logic + cooldown := timeUntilNextDay() + retryAfter = &cooldown + logWithRequestID(ctx).Warnf("qwen quota exceeded (http %d -> %d), cooling down until tomorrow (%v)", httpCode, errCode, cooldown) + } + return errCode, retryAfter +} + +// timeUntilNextDay returns duration until midnight Beijing time (UTC+8). +// Qwen's daily quota resets at 00:00 Beijing time. +func timeUntilNextDay() time.Duration { + now := time.Now() + nowLocal := now.In(qwenBeijingLoc) + tomorrow := time.Date(nowLocal.Year(), nowLocal.Month(), nowLocal.Day()+1, 0, 0, 0, 0, qwenBeijingLoc) + return tomorrow.Sub(now) +} + // QwenExecutor is a stateless executor for Qwen Code using OpenAI-compatible chat completions. // If access token is unavailable, it falls back to legacy via ClientAdapter. type QwenExecutor struct { @@ -67,6 +210,17 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req if opts.Alt == "responses/compact" { return resp, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"} } + + // Check rate limit before proceeding + var authID string + if auth != nil { + authID = auth.ID + } + if err := checkQwenRateLimit(authID); err != nil { + logWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID)) + return resp, err + } + baseModel := thinking.ParseSuffix(req.Model).ModelName token, baseURL := qwenCreds(auth) @@ -102,9 +256,8 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req return resp, err } applyQwenHeaders(httpReq, token, false) - var authID, authLabel, authType, authValue string + var authLabel, authType, authValue string if auth != nil { - authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } @@ -135,8 +288,10 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) - err = statusErr{code: httpResp.StatusCode, msg: string(b)} + + errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) + logWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} return resp, err } data, err := io.ReadAll(httpResp.Body) @@ -158,6 +313,17 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut if opts.Alt == "responses/compact" { return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"} } + + // Check rate limit before proceeding + var authID string + if auth != nil { + authID = auth.ID + } + if err := checkQwenRateLimit(authID); err != nil { + logWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID)) + return nil, err + } + baseModel := thinking.ParseSuffix(req.Model).ModelName token, baseURL := qwenCreds(auth) @@ -200,9 +366,8 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut return nil, err } applyQwenHeaders(httpReq, token, true) - var authID, authLabel, authType, authValue string + var authLabel, authType, authValue string if auth != nil { - authID = auth.ID authLabel = auth.Label authType, authValue = auth.AccountInfo() } @@ -228,11 +393,13 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) - logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + + errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) + logWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("qwen executor: close response body error: %v", errClose) } - err = statusErr{code: httpResp.StatusCode, msg: string(b)} + err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} return nil, err } out := make(chan cliproxyexecutor.StreamChunk)