Add nil-check for GetRequestMutex across handlers to prevent potential panics

- Updated all handlers to safely unlock the request mutex only if it's non-nil.
- Enhanced mutex locking and unlocking logic to avoid runtime errors.
- Improved robustness of resource cleanup across clients.

Add `GetRequestMutex` method for synchronization across clients

- Introduced a new `GetRequestMutex` method in OpenAICompatibilityClient, CodexClient, GeminiCLIClient, GeminiClient, and QwenClient for request synchronization.
- Ensures only one request is processed at a time to manage quotas effectively.
This commit is contained in:
Luis Pater
2025-08-28 02:47:15 +08:00
parent bea5f97cbf
commit 3704dae342
11 changed files with 95 additions and 15 deletions

View File

@@ -133,7 +133,9 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
// Ensure the client's mutex is unlocked on function exit.
// This prevents deadlocks and ensures proper resource cleanup
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
retryCount := 0

View File

@@ -163,7 +163,9 @@ func (h *GeminiCLIAPIHandler) handleInternalStreamGenerateContent(c *gin.Context
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -244,7 +246,9 @@ func (h *GeminiCLIAPIHandler) handleInternalGenerateContent(c *gin.Context, rawJ
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()

View File

@@ -214,7 +214,9 @@ func (h *GeminiAPIHandler) handleStreamGenerateContent(c *gin.Context, modelName
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -303,7 +305,9 @@ func (h *GeminiAPIHandler) handleCountTokens(c *gin.Context, modelName string, r
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -354,7 +358,9 @@ func (h *GeminiAPIHandler) handleGenerateContent(c *gin.Context, modelName strin
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()

View File

@@ -102,6 +102,8 @@ func (h *BaseAPIHandler) GetClient(modelName string, isGenerateContent ...bool)
}
}
// Lock the mutex to update the last used client index
h.Mutex.Lock()
if _, hasKey := h.LastUsedClientIndex[modelName]; !hasKey {
h.LastUsedClientIndex[modelName] = 0
}
@@ -112,8 +114,6 @@ func (h *BaseAPIHandler) GetClient(modelName string, isGenerateContent ...bool)
var cliClient interfaces.Client
// Lock the mutex to update the last used client index
h.Mutex.Lock()
startIndex := h.LastUsedClientIndex[modelName]
if (len(isGenerateContent) > 0 && isGenerateContent[0]) || len(isGenerateContent) == 0 {
currentIndex := (startIndex + 1) % len(clients)
@@ -157,14 +157,20 @@ func (h *BaseAPIHandler) GetClient(modelName string, isGenerateContent ...bool)
locked := false
for i := 0; i < len(reorderedClients); i++ {
cliClient = reorderedClients[i]
if cliClient.GetRequestMutex().TryLock() {
if mutex := cliClient.GetRequestMutex(); mutex != nil {
if mutex.TryLock() {
locked = true
break
}
} else {
locked = true
}
}
if !locked {
cliClient = clients[0]
cliClient.GetRequestMutex().Lock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Lock()
}
}
return cliClient, nil

View File

@@ -380,7 +380,9 @@ func (h *OpenAIAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSON []
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -454,7 +456,9 @@ func (h *OpenAIAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON []byt
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -543,7 +547,9 @@ func (h *OpenAIAPIHandler) handleCompletionsNonStreamingResponse(c *gin.Context,
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -623,7 +629,9 @@ func (h *OpenAIAPIHandler) handleCompletionsStreamingResponse(c *gin.Context, ra
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()

View File

@@ -557,3 +557,12 @@ func (c *ClaudeClient) IsModelQuotaExceeded(model string) bool {
}
return false
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *ClaudeClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -430,3 +430,12 @@ func (c *CodexClient) IsModelQuotaExceeded(model string) bool {
}
return false
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *CodexClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -851,3 +851,12 @@ func (c *GeminiCLIClient) GetUserAgent() string {
// return fmt.Sprintf("GeminiCLI/%s (%s; %s)", pluginVersion, runtime.GOOS, runtime.GOARCH)
return "google-api-nodejs-client/9.15.1"
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *GeminiCLIClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -425,3 +425,12 @@ func (c *GeminiClient) GetUserAgent() string {
// return fmt.Sprintf("GeminiCLI/%s (%s; %s)", pluginVersion, runtime.GOOS, runtime.GOARCH)
return "google-api-nodejs-client/9.15.1"
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *GeminiClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -390,3 +390,12 @@ func (c *OpenAICompatibilityClient) RefreshTokens(ctx context.Context) error {
// API keys don't need refreshing
return nil
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *OpenAICompatibilityClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -432,3 +432,12 @@ func (c *QwenClient) IsModelQuotaExceeded(model string) bool {
}
return false
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *QwenClient) GetRequestMutex() *sync.Mutex {
return nil
}